You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Costas Piliotis (JIRA)" <ji...@apache.org> on 2017/11/09 04:28:00 UTC

[jira] [Comment Edited] (PARQUET-1157) Parquet Write bug - parquet data unreadable by hive or presto or spark 2.1

    [ https://issues.apache.org/jira/browse/PARQUET-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245184#comment-16245184 ] 

Costas Piliotis edited comment on PARQUET-1157 at 11/9/17 4:27 AM:
-------------------------------------------------------------------

Attached is the full stack trace from running in Hive 2.1.0 which seems to be the most verbose and always leads toward 

Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.


was (Author: cpiliotis):
Attached is the full stack trace from running in Hive 2.1.1 which seems to be the most verbose and always leads toward 

Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.

> Parquet Write bug - parquet data unreadable by hive or presto or spark 2.1
> --------------------------------------------------------------------------
>
>                 Key: PARQUET-1157
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1157
>             Project: Parquet
>          Issue Type: Bug
>    Affects Versions: 1.8.1
>         Environment: parquet-avro
> spark 2.1
> hive 1.2
> hive 2.1.0
> presto 0.157
> presto 0.180
>            Reporter: Costas Piliotis
>         Attachments: log_106898428_1510201521.txt20171109-25172-1jt8dp2
>
>
> In our paradigm, we have a mapreduce output parquet data to s3, and then we use a spark job to consolidate these files from our staging area into target tables and add the partitions and modify tables as need be.
> We have implemented and are using parquet schema evolution.    
> The data written from our mapreduce task shows for this column the following metadata (written as parquet-avro):
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated int32 array;
>   }
> {code}
> However when spark writes it out it is converted.   We have tried both the legacy parquet format on and off.
> With legacy 
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated group list {
>       optional int32 element;
>     }
>   }
> {code}
> and with legacy:
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated group bag {
>       optional int32 array;
>     }
>   }
> {code}
> From what I've been reading in the spec the latter seems valid.
> Sporadically we see some array columns producing odd failures in this parquet format on read:
> {code}
> Query 20171108_224243_00083_ec9ww failed: com.facebook.presto.spi.PrestoException
> Can not read value at 28857 in block 0 in file s3://.....
> com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.advanceNextPosition(ParquetHiveRecordCursor.java:232)
> com.facebook.presto.hive.HiveCoercionRecordCursor.advanceNextPosition(HiveCoercionRecordCursor.java:98)
> com.facebook.presto.hive.HiveRecordCursor.advanceNextPosition(HiveRecordCursor.java:179)
> com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:99)
> com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:247)
> com.facebook.presto.operator.Driver.processInternal(Driver.java:378)
> com.facebook.presto.operator.Driver.processFor(Driver.java:301)
> com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:622)
> com.facebook.presto.execution.TaskExecutor$PrioritizedSplitRunner.process(TaskExecutor.java:534)
> com.facebook.presto.execution.TaskExecutor$Runner.run(TaskExecutor.java:670)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> And in spark reading this file:
> {code}
> java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
> 	at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
> 	at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:82)
> 	at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:64)
> 	at org.apache.parquet.column.values.dictionary.DictionaryValuesReader.readInteger(DictionaryValuesReader.java:112)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl$2$3.read(ColumnReaderImpl.java:243)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:464)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:370)
> 	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
> 	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
> 	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
> 	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:99)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> I'm perhaps hopeful that this bug has been fixed and is related to PARQUET-511
> For giggles I also took this parquet data and loaded it into Amazon Athena (which is basically presto anyway) in hopes that it was corruption on our end and Athena is throwing the same thing 
> {code}
> HIVE_CURSOR_ERROR: Can not read value at 28857 in block 0 in file 
> {code}
> The integer value isn't particularly interesting; it's a 0.
> The parquet write command we used in spark is not particularly interesting.  
> {code}
>       data.repartition(((data.count() / 10000000) + 1).toInt).write.format("parquet")
>         .mode("append")
>         .partitionBy(partitionColumns: _*)
>         .save(path)
> {code}
> Currently our vendor has not been successful in moving our libraries to parquet 1.9 at this time.   I believe this issue if it's related to PARQUET-511 should be resolved by our vendor, but I'm seeking clarification if this is in fact the case.
> My version of parquet tools on my desktop:
> * can totally dump the contents of that column without error
> * is on parquet 1.9
> At this point I'm stumped and I believe this to be a bug somewhere.   
> If this is a duplicate of PARQUET-511, cool, but if hive, presto, and spark are all struggling to read this file written out by spark I'm inclined to believe it's either spark or parquet library itself.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)