You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2015/06/03 18:00:40 UTC

[jira] [Commented] (SPARK-4520) SparkSQL exception when reading certain columns from a parquet file

    [ https://issues.apache.org/jira/browse/SPARK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571248#comment-14571248 ] 

Cheng Lian commented on SPARK-4520:
-----------------------------------

This issue is related to Parquet backwards compatibility. The column name with {{_tuple}} postfix is a historical issue of parquet-thrift. Related logic [still exists|https://github.com/apache/parquet-mr/blob/apache-parquet-1.7.0/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java#L114-L145] in the most recent Parquet 1.7.0 release.

The most recent Parquet format spec (not released yet up until writing) handles this situation via [LIST backwards compatibility rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]. IIRC, at least these rules have been implemented properly in parquet-avro, not quite sure about situations of other Parquet submodules.

SPARK-6774 aims to fix these stuff for Spark SQL Parquet support.

Was reviewing Parquet backwards compatibility related issues. Just leave a comment here for future reference.

> SparkSQL exception when reading certain columns from a parquet file
> -------------------------------------------------------------------
>
>                 Key: SPARK-4520
>                 URL: https://issues.apache.org/jira/browse/SPARK-4520
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.0
>            Reporter: sadhan sood
>            Assignee: sadhan sood
>            Priority: Critical
>             Fix For: 1.3.0
>
>         Attachments: part-r-00000.parquet
>
>
> I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them.
> On some further digging, I was able to narrow it down to at-least one particular column type: map<string, set<string>> to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file:
> Test.thrift
> ===========
> {code}
> typedef binary SomeId
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
> }
> {code}
> =============
> And loading the data in spark through schemaRDD:
> {code}
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: array (valueContainsNull = true)
>  |    |    |-- element: string (containsNull = false)
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the exception stack here 
> {code}
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
> 	at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> 	at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> 	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> 	at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> 	at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> 	at java.util.ArrayList.elementData(ArrayList.java:418)
> 	at java.util.ArrayList.get(ArrayList.java:431)
> 	at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> 	at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> 	at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
> 	at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
> 	at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
> 	at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
> 	at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
> 	at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
> 	at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
> 	at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
> 	at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
> 	... 27 more
> {code}
> If you take out the col_d from the thrift file, the problem goes away. The problem also shows up when trying to read the particular column without caching the table first. The same file can be dumped/read using parquet-tools just fine. Here is the file dump using parquet-tools:
> {code}
> row group 0 
> --------------------------------------------------------------------------------
> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
> col_d:          
> .map:           
> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
> ..value:        
> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>     col_a TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>     col_b TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>     col_c TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>     col_d.map.key TV=9 RL=1 DL=2
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
> BINARY col_a 
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 *** 
> value 1: R:1 D:1 V:a1
> value 2: R:1 D:1 V:a2
> value 3: R:1 D:1 V:a3
> value 4: R:1 D:1 V:a4
> value 5: R:1 D:1 V:a5
> value 6: R:1 D:1 V:a6
> value 7: R:1 D:1 V:a7
> value 8: R:1 D:1 V:a8
> value 9: R:1 D:1 V:a9
> BINARY col_b 
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 *** 
> value 1: R:1 D:1 V:b1
> value 2: R:1 D:1 V:b2
> value 3: R:1 D:1 V:b3
> value 4: R:1 D:1 V:b4
> value 5: R:1 D:1 V:b5
> value 6: R:1 D:1 V:b6
> value 7: R:1 D:1 V:b7
> value 8: R:1 D:1 V:b8
> value 9: R:1 D:1 V:b9
> BINARY col_c 
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 *** 
> value 1: R:1 D:1 V:c1
> value 2: R:1 D:1 V:c2
> value 3: R:1 D:1 V:c3
> value 4: R:1 D:1 V:c4
> value 5: R:1 D:1 V:c5
> value 6: R:1 D:1 V:c6
> value 7: R:1 D:1 V:c7
> value 8: R:1 D:1 V:c8
> value 9: R:1 D:1 V:c9
> BINARY col_d.map.key 
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 *** 
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
> BINARY col_d.map.value.value_tuple 
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 *** 
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org