You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sadhan Sood <sa...@gmail.com> on 2014/11/14 22:28:38 UTC

SparkSQL exception on cached parquet table

While testing SparkSQL on a bunch of parquet files (basically used to be a
partition for one of our hive tables), I encountered this error:

import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- works
fine
sqlContext.cacheTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- fails
with an exception

parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in
file
hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet

        at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

        at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

        at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)

        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

        at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

        at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

        at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

        at org.apache.spark.scheduler.Task.run(Task.scala:56)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException

Re: SparkSQL exception on cached parquet table

Posted by sadhan <sa...@gmail.com>.
Hi Cheng,

Thanks for your response.Here is the stack trace from yarn logs:





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-exception-on-cached-parquet-table-tp18978p19020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: SparkSQL exception on cached parquet table

Posted by Sadhan Sood <sa...@gmail.com>.
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520

On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Can you open a JIRA?
>
> On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> I am running on master, pulled yesterday I believe but saw the same issue
>> with 1.2.0
>>
>> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> Which version are you running on again?
>>>
>>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sa...@gmail.com>
>>> wrote:
>>>
>>>> Also attaching the parquet file if anyone wants to take a further look.
>>>>
>>>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> So, I am seeing this issue with spark sql throwing an exception when
>>>>> trying to read selective columns from a thrift parquet file and also when
>>>>> caching them:
>>>>> On some further digging, I was able to narrow it down to at-least one
>>>>> particular column type: map<string, set<string>> to be causing this issue.
>>>>> To reproduce this I created a test thrift file with a very basic schema and
>>>>> stored some sample data in a parquet file:
>>>>>
>>>>> Test.thrift
>>>>> ===========
>>>>> typedef binary SomeId
>>>>>
>>>>> enum SomeExclusionCause {
>>>>>   WHITELIST = 1,
>>>>>   HAS_PURCHASE = 2,
>>>>> }
>>>>>
>>>>> struct SampleThriftObject {
>>>>>   10: string col_a;
>>>>>   20: string col_b;
>>>>>   30: string col_c;
>>>>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>>>>> }
>>>>> =============
>>>>>
>>>>> And loading the data in spark through schemaRDD:
>>>>>
>>>>> import org.apache.spark.sql.SchemaRDD
>>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>>>> val parquetFile = "/path/to/generated/parquet/file"
>>>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>> parquetFileRDD.printSchema
>>>>> root
>>>>>  |-- col_a: string (nullable = true)
>>>>>  |-- col_b: string (nullable = true)
>>>>>  |-- col_c: string (nullable = true)
>>>>>  |-- col_d: map (nullable = true)
>>>>>  |    |-- key: string
>>>>>  |    |-- value: array (valueContainsNull = true)
>>>>>  |    |    |-- element: string (containsNull = false)
>>>>>
>>>>> parquetFileRDD.registerTempTable("test")
>>>>> sqlContext.cacheTable("test")
>>>>> sqlContext.sql("select col_a from test").collect() <-- see the
>>>>> exception stack here
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
>>>>> stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not
>>>>> read value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>>>>> at
>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>> at
>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>> at
>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>> at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>> at
>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>> at scala.collection.TraversableOnce$class.to
>>>>> (TraversableOnce.scala:273)
>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>> at
>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>> at
>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>>>> at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>>>> at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>> at java.util.ArrayList.get(ArrayList.java:431)
>>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>> at
>>>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>> at
>>>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>> at
>>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>> at
>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>> ... 27 more
>>>>>
>>>>> If you take out the col_d from the thrift file, the problem goes away.
>>>>> The problem also shows up when trying to read the particular column without
>>>>> caching the table first. The same file can be dumped/read using
>>>>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>>>>
>>>>> row group 0
>>>>> --------------------------------------------------------------------------------
>>>>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>>>>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>>>>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>>>>> col_d:
>>>>> .map:
>>>>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>>>>> ..value:
>>>>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>>>>
>>>>>     col_a TV=9 RL=0 DL=1
>>>>>     ----------------------------------------------------------------------------
>>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>>
>>>>>     col_b TV=9 RL=0 DL=1
>>>>>     ----------------------------------------------------------------------------
>>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>>
>>>>>     col_c TV=9 RL=0 DL=1
>>>>>     ----------------------------------------------------------------------------
>>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>>
>>>>>     col_d.map.key TV=9 RL=1 DL=2
>>>>>     ----------------------------------------------------------------------------
>>>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>>>
>>>>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>>>>     ----------------------------------------------------------------------------
>>>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>>>
>>>>> BINARY col_a
>>>>> --------------------------------------------------------------------------------
>>>>> *** row group 1 of 1, values 1 to 9 ***
>>>>> value 1: R:1 D:1 V:a1
>>>>> value 2: R:1 D:1 V:a2
>>>>> value 3: R:1 D:1 V:a3
>>>>> value 4: R:1 D:1 V:a4
>>>>> value 5: R:1 D:1 V:a5
>>>>> value 6: R:1 D:1 V:a6
>>>>> value 7: R:1 D:1 V:a7
>>>>> value 8: R:1 D:1 V:a8
>>>>> value 9: R:1 D:1 V:a9
>>>>>
>>>>> BINARY col_b
>>>>> --------------------------------------------------------------------------------
>>>>> *** row group 1 of 1, values 1 to 9 ***
>>>>> value 1: R:1 D:1 V:b1
>>>>> value 2: R:1 D:1 V:b2
>>>>> value 3: R:1 D:1 V:b3
>>>>> value 4: R:1 D:1 V:b4
>>>>> value 5: R:1 D:1 V:b5
>>>>> value 6: R:1 D:1 V:b6
>>>>> value 7: R:1 D:1 V:b7
>>>>> value 8: R:1 D:1 V:b8
>>>>> value 9: R:1 D:1 V:b9
>>>>>
>>>>> BINARY col_c
>>>>> --------------------------------------------------------------------------------
>>>>> *** row group 1 of 1, values 1 to 9 ***
>>>>> value 1: R:1 D:1 V:c1
>>>>> value 2: R:1 D:1 V:c2
>>>>> value 3: R:1 D:1 V:c3
>>>>> value 4: R:1 D:1 V:c4
>>>>> value 5: R:1 D:1 V:c5
>>>>> value 6: R:1 D:1 V:c6
>>>>> value 7: R:1 D:1 V:c7
>>>>> value 8: R:1 D:1 V:c8
>>>>> value 9: R:1 D:1 V:c9
>>>>>
>>>>> BINARY col_d.map.key
>>>>> --------------------------------------------------------------------------------
>>>>> *** row group 1 of 1, values 1 to 9 ***
>>>>> value 1: R:0 D:0 V:<null>
>>>>> value 2: R:0 D:0 V:<null>
>>>>> value 3: R:0 D:0 V:<null>
>>>>> value 4: R:0 D:0 V:<null>
>>>>> value 5: R:0 D:0 V:<null>
>>>>> value 6: R:0 D:0 V:<null>
>>>>> value 7: R:0 D:0 V:<null>
>>>>> value 8: R:0 D:0 V:<null>
>>>>> value 9: R:0 D:0 V:<null>
>>>>>
>>>>> BINARY col_d.map.value.value_tuple
>>>>> --------------------------------------------------------------------------------
>>>>> *** row group 1 of 1, values 1 to 9 ***
>>>>> value 1: R:0 D:0 V:<null>
>>>>> value 2: R:0 D:0 V:<null>
>>>>> value 3: R:0 D:0 V:<null>
>>>>> value 4: R:0 D:0 V:<null>
>>>>> value 5: R:0 D:0 V:<null>
>>>>> value 6: R:0 D:0 V:<null>
>>>>> value 7: R:0 D:0 V:<null>
>>>>> value 8: R:0 D:0 V:<null>
>>>>> value 9: R:0 D:0 V:<null>
>>>>>
>>>>>
>>>>> I am happy to provide more information but any help is appreciated.
>>>>>
>>>>>
>>>>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Cheng,
>>>>>>
>>>>>> I tried reading the parquet file(on which we were getting the
>>>>>> exception) through parquet-tools and it is able to dump the file and I can
>>>>>> read the metadata, etc. I also loaded the file through hive table and can
>>>>>> run a table scan query on it as well. Let me know if I can do more to help
>>>>>> resolve the problem, I'll run it through a debugger and see if I can get
>>>>>> more information on it in the meantime.
>>>>>>
>>>>>> Thanks,
>>>>>> Sadhan
>>>>>>
>>>>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  (Forgot to cc user mail list)
>>>>>>>
>>>>>>>
>>>>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>>>>
>>>>>>> Hey Sadhan,
>>>>>>>
>>>>>>>  Thanks for the additional information, this is helpful. Seems that
>>>>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>>>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>>>>>> to help to narrow down the problem by trying to scan exactly the same
>>>>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>>>>> systems work, then there must be something wrong with Spark SQL.
>>>>>>>
>>>>>>>  Cheng
>>>>>>>
>>>>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Cheng,
>>>>>>>>
>>>>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>>>>
>>>>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>>>>         ... 26 more
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>  Hi Sadhan,
>>>>>>>>>
>>>>>>>>> Could you please provide the stack trace of the
>>>>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>>>>>>> table to give COUNT(*). In the second case, however, the whole
>>>>>>>>> table is asked to be cached lazily via the cacheTable call, thus
>>>>>>>>> it’s scanned to build the in-memory columnar cache. Then thing went wrong
>>>>>>>>> while scanning this LZO compressed Parquet file. But unfortunately the
>>>>>>>>> stack trace at hand doesn’t indicate the root cause.
>>>>>>>>>
>>>>>>>>> Cheng
>>>>>>>>>
>>>>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>>>>
>>>>>>>>> While testing SparkSQL on a bunch of parquet files (basically used
>>>>>>>>> to be a partition for one of our hive tables), I encountered this error:
>>>>>>>>>
>>>>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>>>>
>>>>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>>>>
>>>>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>>>> works fine
>>>>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>>>> fails with an exception
>>>>>>>>>
>>>>>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>>>>>> block -1 in file
>>>>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>>>>
>>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>>
>>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>>
>>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>>
>>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>>>
>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>>>>
>>>>>>>>>   ​
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>
>>>
>>
>

Re: SparkSQL exception on cached parquet table

Posted by Michael Armbrust <mi...@databricks.com>.
Can you open a JIRA?

On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood <sa...@gmail.com> wrote:

> I am running on master, pulled yesterday I believe but saw the same issue
> with 1.2.0
>
> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Which version are you running on again?
>>
>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sa...@gmail.com>
>> wrote:
>>
>>> Also attaching the parquet file if anyone wants to take a further look.
>>>
>>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sa...@gmail.com>
>>> wrote:
>>>
>>>> So, I am seeing this issue with spark sql throwing an exception when
>>>> trying to read selective columns from a thrift parquet file and also when
>>>> caching them:
>>>> On some further digging, I was able to narrow it down to at-least one
>>>> particular column type: map<string, set<string>> to be causing this issue.
>>>> To reproduce this I created a test thrift file with a very basic schema and
>>>> stored some sample data in a parquet file:
>>>>
>>>> Test.thrift
>>>> ===========
>>>> typedef binary SomeId
>>>>
>>>> enum SomeExclusionCause {
>>>>   WHITELIST = 1,
>>>>   HAS_PURCHASE = 2,
>>>> }
>>>>
>>>> struct SampleThriftObject {
>>>>   10: string col_a;
>>>>   20: string col_b;
>>>>   30: string col_c;
>>>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>>>> }
>>>> =============
>>>>
>>>> And loading the data in spark through schemaRDD:
>>>>
>>>> import org.apache.spark.sql.SchemaRDD
>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>>> val parquetFile = "/path/to/generated/parquet/file"
>>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>> parquetFileRDD.printSchema
>>>> root
>>>>  |-- col_a: string (nullable = true)
>>>>  |-- col_b: string (nullable = true)
>>>>  |-- col_c: string (nullable = true)
>>>>  |-- col_d: map (nullable = true)
>>>>  |    |-- key: string
>>>>  |    |-- value: array (valueContainsNull = true)
>>>>  |    |    |-- element: string (containsNull = false)
>>>>
>>>> parquetFileRDD.registerTempTable("test")
>>>> sqlContext.cacheTable("test")
>>>> sqlContext.sql("select col_a from test").collect() <-- see the
>>>> exception stack here
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>>>> at
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>> at
>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>> at
>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>> at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>> at
>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>> at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>> at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>> at
>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>> at
>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>>> at java.util.ArrayList.get(ArrayList.java:431)
>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>> at
>>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>> at
>>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>> at
>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>> at
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>> ... 27 more
>>>>
>>>> If you take out the col_d from the thrift file, the problem goes away.
>>>> The problem also shows up when trying to read the particular column without
>>>> caching the table first. The same file can be dumped/read using
>>>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>>>
>>>> row group 0
>>>> --------------------------------------------------------------------------------
>>>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>>>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>>>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>>>> col_d:
>>>> .map:
>>>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>>>> ..value:
>>>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>>>
>>>>     col_a TV=9 RL=0 DL=1
>>>>     ----------------------------------------------------------------------------
>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>
>>>>     col_b TV=9 RL=0 DL=1
>>>>     ----------------------------------------------------------------------------
>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>
>>>>     col_c TV=9 RL=0 DL=1
>>>>     ----------------------------------------------------------------------------
>>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>>
>>>>     col_d.map.key TV=9 RL=1 DL=2
>>>>     ----------------------------------------------------------------------------
>>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>>
>>>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>>>     ----------------------------------------------------------------------------
>>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>>
>>>> BINARY col_a
>>>> --------------------------------------------------------------------------------
>>>> *** row group 1 of 1, values 1 to 9 ***
>>>> value 1: R:1 D:1 V:a1
>>>> value 2: R:1 D:1 V:a2
>>>> value 3: R:1 D:1 V:a3
>>>> value 4: R:1 D:1 V:a4
>>>> value 5: R:1 D:1 V:a5
>>>> value 6: R:1 D:1 V:a6
>>>> value 7: R:1 D:1 V:a7
>>>> value 8: R:1 D:1 V:a8
>>>> value 9: R:1 D:1 V:a9
>>>>
>>>> BINARY col_b
>>>> --------------------------------------------------------------------------------
>>>> *** row group 1 of 1, values 1 to 9 ***
>>>> value 1: R:1 D:1 V:b1
>>>> value 2: R:1 D:1 V:b2
>>>> value 3: R:1 D:1 V:b3
>>>> value 4: R:1 D:1 V:b4
>>>> value 5: R:1 D:1 V:b5
>>>> value 6: R:1 D:1 V:b6
>>>> value 7: R:1 D:1 V:b7
>>>> value 8: R:1 D:1 V:b8
>>>> value 9: R:1 D:1 V:b9
>>>>
>>>> BINARY col_c
>>>> --------------------------------------------------------------------------------
>>>> *** row group 1 of 1, values 1 to 9 ***
>>>> value 1: R:1 D:1 V:c1
>>>> value 2: R:1 D:1 V:c2
>>>> value 3: R:1 D:1 V:c3
>>>> value 4: R:1 D:1 V:c4
>>>> value 5: R:1 D:1 V:c5
>>>> value 6: R:1 D:1 V:c6
>>>> value 7: R:1 D:1 V:c7
>>>> value 8: R:1 D:1 V:c8
>>>> value 9: R:1 D:1 V:c9
>>>>
>>>> BINARY col_d.map.key
>>>> --------------------------------------------------------------------------------
>>>> *** row group 1 of 1, values 1 to 9 ***
>>>> value 1: R:0 D:0 V:<null>
>>>> value 2: R:0 D:0 V:<null>
>>>> value 3: R:0 D:0 V:<null>
>>>> value 4: R:0 D:0 V:<null>
>>>> value 5: R:0 D:0 V:<null>
>>>> value 6: R:0 D:0 V:<null>
>>>> value 7: R:0 D:0 V:<null>
>>>> value 8: R:0 D:0 V:<null>
>>>> value 9: R:0 D:0 V:<null>
>>>>
>>>> BINARY col_d.map.value.value_tuple
>>>> --------------------------------------------------------------------------------
>>>> *** row group 1 of 1, values 1 to 9 ***
>>>> value 1: R:0 D:0 V:<null>
>>>> value 2: R:0 D:0 V:<null>
>>>> value 3: R:0 D:0 V:<null>
>>>> value 4: R:0 D:0 V:<null>
>>>> value 5: R:0 D:0 V:<null>
>>>> value 6: R:0 D:0 V:<null>
>>>> value 7: R:0 D:0 V:<null>
>>>> value 8: R:0 D:0 V:<null>
>>>> value 9: R:0 D:0 V:<null>
>>>>
>>>>
>>>> I am happy to provide more information but any help is appreciated.
>>>>
>>>>
>>>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Cheng,
>>>>>
>>>>> I tried reading the parquet file(on which we were getting the
>>>>> exception) through parquet-tools and it is able to dump the file and I can
>>>>> read the metadata, etc. I also loaded the file through hive table and can
>>>>> run a table scan query on it as well. Let me know if I can do more to help
>>>>> resolve the problem, I'll run it through a debugger and see if I can get
>>>>> more information on it in the meantime.
>>>>>
>>>>> Thanks,
>>>>> Sadhan
>>>>>
>>>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>  (Forgot to cc user mail list)
>>>>>>
>>>>>>
>>>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>>>
>>>>>> Hey Sadhan,
>>>>>>
>>>>>>  Thanks for the additional information, this is helpful. Seems that
>>>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>>>>> to help to narrow down the problem by trying to scan exactly the same
>>>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>>>> systems work, then there must be something wrong with Spark SQL.
>>>>>>
>>>>>>  Cheng
>>>>>>
>>>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Cheng,
>>>>>>>
>>>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>>>
>>>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>>>         ... 26 more
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hi Sadhan,
>>>>>>>>
>>>>>>>> Could you please provide the stack trace of the
>>>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>>>>>> table to give COUNT(*). In the second case, however, the whole
>>>>>>>> table is asked to be cached lazily via the cacheTable call, thus
>>>>>>>> it’s scanned to build the in-memory columnar cache. Then thing went wrong
>>>>>>>> while scanning this LZO compressed Parquet file. But unfortunately the
>>>>>>>> stack trace at hand doesn’t indicate the root cause.
>>>>>>>>
>>>>>>>> Cheng
>>>>>>>>
>>>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>>>
>>>>>>>> While testing SparkSQL on a bunch of parquet files (basically used
>>>>>>>> to be a partition for one of our hive tables), I encountered this error:
>>>>>>>>
>>>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>>>
>>>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>>>
>>>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>>> works fine
>>>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>>> fails with an exception
>>>>>>>>
>>>>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>>>>> block -1 in file
>>>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>>>
>>>>>>>>         at
>>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>>>
>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>
>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>
>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>>
>>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>>
>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>
>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>>>
>>>>>>>>   ​
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>>
>

Re: SparkSQL exception on cached parquet table

Posted by Sadhan Sood <sa...@gmail.com>.
I am running on master, pulled yesterday I believe but saw the same issue
with 1.2.0

On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Which version are you running on again?
>
> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> Also attaching the parquet file if anyone wants to take a further look.
>>
>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sa...@gmail.com>
>> wrote:
>>
>>> So, I am seeing this issue with spark sql throwing an exception when
>>> trying to read selective columns from a thrift parquet file and also when
>>> caching them:
>>> On some further digging, I was able to narrow it down to at-least one
>>> particular column type: map<string, set<string>> to be causing this issue.
>>> To reproduce this I created a test thrift file with a very basic schema and
>>> stored some sample data in a parquet file:
>>>
>>> Test.thrift
>>> ===========
>>> typedef binary SomeId
>>>
>>> enum SomeExclusionCause {
>>>   WHITELIST = 1,
>>>   HAS_PURCHASE = 2,
>>> }
>>>
>>> struct SampleThriftObject {
>>>   10: string col_a;
>>>   20: string col_b;
>>>   30: string col_c;
>>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>>> }
>>> =============
>>>
>>> And loading the data in spark through schemaRDD:
>>>
>>> import org.apache.spark.sql.SchemaRDD
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>> val parquetFile = "/path/to/generated/parquet/file"
>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.printSchema
>>> root
>>>  |-- col_a: string (nullable = true)
>>>  |-- col_b: string (nullable = true)
>>>  |-- col_c: string (nullable = true)
>>>  |-- col_d: map (nullable = true)
>>>  |    |-- key: string
>>>  |    |-- value: array (valueContainsNull = true)
>>>  |    |    |-- element: string (containsNull = false)
>>>
>>> parquetFileRDD.registerTempTable("test")
>>> sqlContext.cacheTable("test")
>>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>>> stack here
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>> at
>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>> at
>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>> ... 27 more
>>>
>>> If you take out the col_d from the thrift file, the problem goes away.
>>> The problem also shows up when trying to read the particular column without
>>> caching the table first. The same file can be dumped/read using
>>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>>
>>> row group 0
>>> --------------------------------------------------------------------------------
>>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>>> col_d:
>>> .map:
>>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>>> ..value:
>>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>>
>>>     col_a TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_b TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_c TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_d.map.key TV=9 RL=1 DL=2
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>
>>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>
>>> BINARY col_a
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:a1
>>> value 2: R:1 D:1 V:a2
>>> value 3: R:1 D:1 V:a3
>>> value 4: R:1 D:1 V:a4
>>> value 5: R:1 D:1 V:a5
>>> value 6: R:1 D:1 V:a6
>>> value 7: R:1 D:1 V:a7
>>> value 8: R:1 D:1 V:a8
>>> value 9: R:1 D:1 V:a9
>>>
>>> BINARY col_b
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:b1
>>> value 2: R:1 D:1 V:b2
>>> value 3: R:1 D:1 V:b3
>>> value 4: R:1 D:1 V:b4
>>> value 5: R:1 D:1 V:b5
>>> value 6: R:1 D:1 V:b6
>>> value 7: R:1 D:1 V:b7
>>> value 8: R:1 D:1 V:b8
>>> value 9: R:1 D:1 V:b9
>>>
>>> BINARY col_c
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:c1
>>> value 2: R:1 D:1 V:c2
>>> value 3: R:1 D:1 V:c3
>>> value 4: R:1 D:1 V:c4
>>> value 5: R:1 D:1 V:c5
>>> value 6: R:1 D:1 V:c6
>>> value 7: R:1 D:1 V:c7
>>> value 8: R:1 D:1 V:c8
>>> value 9: R:1 D:1 V:c9
>>>
>>> BINARY col_d.map.key
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:0 D:0 V:<null>
>>> value 2: R:0 D:0 V:<null>
>>> value 3: R:0 D:0 V:<null>
>>> value 4: R:0 D:0 V:<null>
>>> value 5: R:0 D:0 V:<null>
>>> value 6: R:0 D:0 V:<null>
>>> value 7: R:0 D:0 V:<null>
>>> value 8: R:0 D:0 V:<null>
>>> value 9: R:0 D:0 V:<null>
>>>
>>> BINARY col_d.map.value.value_tuple
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:0 D:0 V:<null>
>>> value 2: R:0 D:0 V:<null>
>>> value 3: R:0 D:0 V:<null>
>>> value 4: R:0 D:0 V:<null>
>>> value 5: R:0 D:0 V:<null>
>>> value 6: R:0 D:0 V:<null>
>>> value 7: R:0 D:0 V:<null>
>>> value 8: R:0 D:0 V:<null>
>>> value 9: R:0 D:0 V:<null>
>>>
>>>
>>> I am happy to provide more information but any help is appreciated.
>>>
>>>
>>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cheng,
>>>>
>>>> I tried reading the parquet file(on which we were getting the
>>>> exception) through parquet-tools and it is able to dump the file and I can
>>>> read the metadata, etc. I also loaded the file through hive table and can
>>>> run a table scan query on it as well. Let me know if I can do more to help
>>>> resolve the problem, I'll run it through a debugger and see if I can get
>>>> more information on it in the meantime.
>>>>
>>>> Thanks,
>>>> Sadhan
>>>>
>>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com>
>>>> wrote:
>>>>
>>>>>  (Forgot to cc user mail list)
>>>>>
>>>>>
>>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>>
>>>>> Hey Sadhan,
>>>>>
>>>>>  Thanks for the additional information, this is helpful. Seems that
>>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>>>> to help to narrow down the problem by trying to scan exactly the same
>>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>>> systems work, then there must be something wrong with Spark SQL.
>>>>>
>>>>>  Cheng
>>>>>
>>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Cheng,
>>>>>>
>>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>>
>>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>>         ... 26 more
>>>>>>
>>>>>>
>>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hi Sadhan,
>>>>>>>
>>>>>>> Could you please provide the stack trace of the
>>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>>>>> table to give COUNT(*). In the second case, however, the whole
>>>>>>> table is asked to be cached lazily via the cacheTable call, thus
>>>>>>> it’s scanned to build the in-memory columnar cache. Then thing went wrong
>>>>>>> while scanning this LZO compressed Parquet file. But unfortunately the
>>>>>>> stack trace at hand doesn’t indicate the root cause.
>>>>>>>
>>>>>>> Cheng
>>>>>>>
>>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>>
>>>>>>> While testing SparkSQL on a bunch of parquet files (basically used
>>>>>>> to be a partition for one of our hive tables), I encountered this error:
>>>>>>>
>>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>>
>>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>>
>>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>> works fine
>>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>>> fails with an exception
>>>>>>>
>>>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>>>> block -1 in file
>>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>>
>>>>>>>         at
>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>>
>>>>>>>         at
>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>
>>>>>>>         at
>>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>>
>>>>>>>         at
>>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>>
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>>
>>>>>>>   ​
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>

Re: SparkSQL exception on cached parquet table

Posted by Michael Armbrust <mi...@databricks.com>.
Which version are you running on again?

On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sa...@gmail.com> wrote:

> Also attaching the parquet file if anyone wants to take a further look.
>
> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> So, I am seeing this issue with spark sql throwing an exception when
>> trying to read selective columns from a thrift parquet file and also when
>> caching them:
>> On some further digging, I was able to narrow it down to at-least one
>> particular column type: map<string, set<string>> to be causing this issue.
>> To reproduce this I created a test thrift file with a very basic schema and
>> stored some sample data in a parquet file:
>>
>> Test.thrift
>> ===========
>> typedef binary SomeId
>>
>> enum SomeExclusionCause {
>>   WHITELIST = 1,
>>   HAS_PURCHASE = 2,
>> }
>>
>> struct SampleThriftObject {
>>   10: string col_a;
>>   20: string col_b;
>>   30: string col_c;
>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>> }
>> =============
>>
>> And loading the data in spark through schemaRDD:
>>
>> import org.apache.spark.sql.SchemaRDD
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>> val parquetFile = "/path/to/generated/parquet/file"
>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>> parquetFileRDD.printSchema
>> root
>>  |-- col_a: string (nullable = true)
>>  |-- col_b: string (nullable = true)
>>  |-- col_c: string (nullable = true)
>>  |-- col_d: map (nullable = true)
>>  |    |-- key: string
>>  |    |-- value: array (valueContainsNull = true)
>>  |    |    |-- element: string (containsNull = false)
>>
>> parquetFileRDD.registerTempTable("test")
>> sqlContext.cacheTable("test")
>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>> stack here
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>> at
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>> at
>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at
>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>> at
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>> ... 27 more
>>
>> If you take out the col_d from the thrift file, the problem goes away.
>> The problem also shows up when trying to read the particular column without
>> caching the table first. The same file can be dumped/read using
>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>
>> row group 0
>> --------------------------------------------------------------------------------
>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>> col_d:
>> .map:
>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>> ..value:
>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>
>>     col_a TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_b TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_c TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_d.map.key TV=9 RL=1 DL=2
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>
>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>
>> BINARY col_a
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:a1
>> value 2: R:1 D:1 V:a2
>> value 3: R:1 D:1 V:a3
>> value 4: R:1 D:1 V:a4
>> value 5: R:1 D:1 V:a5
>> value 6: R:1 D:1 V:a6
>> value 7: R:1 D:1 V:a7
>> value 8: R:1 D:1 V:a8
>> value 9: R:1 D:1 V:a9
>>
>> BINARY col_b
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:b1
>> value 2: R:1 D:1 V:b2
>> value 3: R:1 D:1 V:b3
>> value 4: R:1 D:1 V:b4
>> value 5: R:1 D:1 V:b5
>> value 6: R:1 D:1 V:b6
>> value 7: R:1 D:1 V:b7
>> value 8: R:1 D:1 V:b8
>> value 9: R:1 D:1 V:b9
>>
>> BINARY col_c
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:c1
>> value 2: R:1 D:1 V:c2
>> value 3: R:1 D:1 V:c3
>> value 4: R:1 D:1 V:c4
>> value 5: R:1 D:1 V:c5
>> value 6: R:1 D:1 V:c6
>> value 7: R:1 D:1 V:c7
>> value 8: R:1 D:1 V:c8
>> value 9: R:1 D:1 V:c9
>>
>> BINARY col_d.map.key
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:0 D:0 V:<null>
>> value 2: R:0 D:0 V:<null>
>> value 3: R:0 D:0 V:<null>
>> value 4: R:0 D:0 V:<null>
>> value 5: R:0 D:0 V:<null>
>> value 6: R:0 D:0 V:<null>
>> value 7: R:0 D:0 V:<null>
>> value 8: R:0 D:0 V:<null>
>> value 9: R:0 D:0 V:<null>
>>
>> BINARY col_d.map.value.value_tuple
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:0 D:0 V:<null>
>> value 2: R:0 D:0 V:<null>
>> value 3: R:0 D:0 V:<null>
>> value 4: R:0 D:0 V:<null>
>> value 5: R:0 D:0 V:<null>
>> value 6: R:0 D:0 V:<null>
>> value 7: R:0 D:0 V:<null>
>> value 8: R:0 D:0 V:<null>
>> value 9: R:0 D:0 V:<null>
>>
>>
>> I am happy to provide more information but any help is appreciated.
>>
>>
>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com>
>> wrote:
>>
>>> Hi Cheng,
>>>
>>> I tried reading the parquet file(on which we were getting the exception)
>>> through parquet-tools and it is able to dump the file and I can read the
>>> metadata, etc. I also loaded the file through hive table and can run a
>>> table scan query on it as well. Let me know if I can do more to help
>>> resolve the problem, I'll run it through a debugger and see if I can get
>>> more information on it in the meantime.
>>>
>>> Thanks,
>>> Sadhan
>>>
>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com>
>>> wrote:
>>>
>>>>  (Forgot to cc user mail list)
>>>>
>>>>
>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>
>>>> Hey Sadhan,
>>>>
>>>>  Thanks for the additional information, this is helpful. Seems that
>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>>> to help to narrow down the problem by trying to scan exactly the same
>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>> systems work, then there must be something wrong with Spark SQL.
>>>>
>>>>  Cheng
>>>>
>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Cheng,
>>>>>
>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>
>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>         ... 26 more
>>>>>
>>>>>
>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>  Hi Sadhan,
>>>>>>
>>>>>> Could you please provide the stack trace of the
>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>>>> table to give COUNT(*). In the second case, however, the whole table
>>>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>>>> scanned to build the in-memory columnar cache. Then thing went wrong while
>>>>>> scanning this LZO compressed Parquet file. But unfortunately the stack
>>>>>> trace at hand doesn’t indicate the root cause.
>>>>>>
>>>>>> Cheng
>>>>>>
>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>
>>>>>> While testing SparkSQL on a bunch of parquet files (basically used to
>>>>>> be a partition for one of our hive tables), I encountered this error:
>>>>>>
>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>
>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>
>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>> works fine
>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>> fails with an exception
>>>>>>
>>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>>> block -1 in file
>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>
>>>>>>         at
>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>
>>>>>>         at
>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>
>>>>>>         at
>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>
>>>>>>         at
>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>
>>>>>>   ​
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Re: SparkSQL exception on cached parquet table

Posted by Sadhan Sood <sa...@gmail.com>.
Also attaching the parquet file if anyone wants to take a further look.

On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sa...@gmail.com> wrote:

> So, I am seeing this issue with spark sql throwing an exception when
> trying to read selective columns from a thrift parquet file and also when
> caching them:
> On some further digging, I was able to narrow it down to at-least one
> particular column type: map<string, set<string>> to be causing this issue.
> To reproduce this I created a test thrift file with a very basic schema and
> stored some sample data in a parquet file:
>
> Test.thrift
> ===========
> typedef binary SomeId
>
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
>
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
> }
> =============
>
> And loading the data in spark through schemaRDD:
>
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: array (valueContainsNull = true)
>  |    |    |-- element: string (containsNull = false)
>
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the exception
> stack here
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
> at
> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
> at
> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
> at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
> ... 27 more
>
> If you take out the col_d from the thrift file, the problem goes away. The
> problem also shows up when trying to read the particular column without
> caching the table first. The same file can be dumped/read using
> parquet-tools just fine. Here is the file dump using parquet-tools:
>
> row group 0
> --------------------------------------------------------------------------------
> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
> col_d:
> .map:
> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
> ..value:
> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>
>     col_a TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_b TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_c TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_d.map.key TV=9 RL=1 DL=2
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>
>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>
> BINARY col_a
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:a1
> value 2: R:1 D:1 V:a2
> value 3: R:1 D:1 V:a3
> value 4: R:1 D:1 V:a4
> value 5: R:1 D:1 V:a5
> value 6: R:1 D:1 V:a6
> value 7: R:1 D:1 V:a7
> value 8: R:1 D:1 V:a8
> value 9: R:1 D:1 V:a9
>
> BINARY col_b
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:b1
> value 2: R:1 D:1 V:b2
> value 3: R:1 D:1 V:b3
> value 4: R:1 D:1 V:b4
> value 5: R:1 D:1 V:b5
> value 6: R:1 D:1 V:b6
> value 7: R:1 D:1 V:b7
> value 8: R:1 D:1 V:b8
> value 9: R:1 D:1 V:b9
>
> BINARY col_c
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:c1
> value 2: R:1 D:1 V:c2
> value 3: R:1 D:1 V:c3
> value 4: R:1 D:1 V:c4
> value 5: R:1 D:1 V:c5
> value 6: R:1 D:1 V:c6
> value 7: R:1 D:1 V:c7
> value 8: R:1 D:1 V:c8
> value 9: R:1 D:1 V:c9
>
> BINARY col_d.map.key
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
>
> BINARY col_d.map.value.value_tuple
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
>
>
> I am happy to provide more information but any help is appreciated.
>
>
> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> Hi Cheng,
>>
>> I tried reading the parquet file(on which we were getting the exception)
>> through parquet-tools and it is able to dump the file and I can read the
>> metadata, etc. I also loaded the file through hive table and can run a
>> table scan query on it as well. Let me know if I can do more to help
>> resolve the problem, I'll run it through a debugger and see if I can get
>> more information on it in the meantime.
>>
>> Thanks,
>> Sadhan
>>
>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com>
>> wrote:
>>
>>>  (Forgot to cc user mail list)
>>>
>>>
>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>
>>> Hey Sadhan,
>>>
>>>  Thanks for the additional information, this is helpful. Seems that
>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>> to help to narrow down the problem by trying to scan exactly the same
>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>> systems work, then there must be something wrong with Spark SQL.
>>>
>>>  Cheng
>>>
>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cheng,
>>>>
>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>
>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>         ... 26 more
>>>>
>>>>
>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>>> wrote:
>>>>
>>>>>  Hi Sadhan,
>>>>>
>>>>> Could you please provide the stack trace of the
>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>>> table to give COUNT(*). In the second case, however, the whole table
>>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>>> scanned to build the in-memory columnar cache. Then thing went wrong while
>>>>> scanning this LZO compressed Parquet file. But unfortunately the stack
>>>>> trace at hand doesn’t indicate the root cause.
>>>>>
>>>>> Cheng
>>>>>
>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>
>>>>> While testing SparkSQL on a bunch of parquet files (basically used to
>>>>> be a partition for one of our hive tables), I encountered this error:
>>>>>
>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>> import org.apache.hadoop.conf.Configuration;
>>>>> import org.apache.hadoop.fs.Path;
>>>>>
>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>
>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>> works fine
>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>> fails with an exception
>>>>>
>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>> block -1 in file
>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>
>>>>>         at
>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>
>>>>>         at
>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>
>>>>>         at
>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>
>>>>>         at
>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>
>>>>>         at
>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>   ​
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: SparkSQL exception on cached parquet table

Posted by Sadhan Sood <sa...@gmail.com>.
So, I am seeing this issue with spark sql throwing an exception when trying
to read selective columns from a thrift parquet file and also when caching
them:
On some further digging, I was able to narrow it down to at-least one
particular column type: map<string, set<string>> to be causing this issue.
To reproduce this I created a test thrift file with a very basic schema and
stored some sample data in a parquet file:

Test.thrift
===========
typedef binary SomeId

enum SomeExclusionCause {
  WHITELIST = 1,
  HAS_PURCHASE = 2,
}

struct SampleThriftObject {
  10: string col_a;
  20: string col_b;
  30: string col_c;
  40: optional map<SomeExclusionCause, set<SomeId>> col_d;
}
=============

And loading the data in spark through schemaRDD:

import org.apache.spark.sql.SchemaRDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val parquetFile = "/path/to/generated/parquet/file"
val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.printSchema
root
 |-- col_a: string (nullable = true)
 |-- col_b: string (nullable = true)
 |-- col_c: string (nullable = true)
 |-- col_d: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = false)

parquetFileRDD.registerTempTable("test")
sqlContext.cacheTable("test")
sqlContext.sql("select col_a from test").collect() <-- see the exception
stack here

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
at
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
... 27 more

If you take out the col_d from the thrift file, the problem goes away. The
problem also shows up when trying to read the particular column without
caching the table first. The same file can be dumped/read using
parquet-tools just fine. Here is the file dump using parquet-tools:

row group 0
--------------------------------------------------------------------------------
col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
col_d:
.map:
..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
..value:
...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...

    col_a TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_b TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_c TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_d.map.key TV=9 RL=1 DL=2
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

    col_d.map.value.value_tuple TV=9 RL=2 DL=4
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

BINARY col_a
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:a1
value 2: R:1 D:1 V:a2
value 3: R:1 D:1 V:a3
value 4: R:1 D:1 V:a4
value 5: R:1 D:1 V:a5
value 6: R:1 D:1 V:a6
value 7: R:1 D:1 V:a7
value 8: R:1 D:1 V:a8
value 9: R:1 D:1 V:a9

BINARY col_b
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:b1
value 2: R:1 D:1 V:b2
value 3: R:1 D:1 V:b3
value 4: R:1 D:1 V:b4
value 5: R:1 D:1 V:b5
value 6: R:1 D:1 V:b6
value 7: R:1 D:1 V:b7
value 8: R:1 D:1 V:b8
value 9: R:1 D:1 V:b9

BINARY col_c
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:c1
value 2: R:1 D:1 V:c2
value 3: R:1 D:1 V:c3
value 4: R:1 D:1 V:c4
value 5: R:1 D:1 V:c5
value 6: R:1 D:1 V:c6
value 7: R:1 D:1 V:c7
value 8: R:1 D:1 V:c8
value 9: R:1 D:1 V:c9

BINARY col_d.map.key
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:0 V:<null>
value 4: R:0 D:0 V:<null>
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:0 V:<null>
value 8: R:0 D:0 V:<null>
value 9: R:0 D:0 V:<null>

BINARY col_d.map.value.value_tuple
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:0 V:<null>
value 4: R:0 D:0 V:<null>
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:0 V:<null>
value 8: R:0 D:0 V:<null>
value 9: R:0 D:0 V:<null>


I am happy to provide more information but any help is appreciated.


On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sa...@gmail.com> wrote:

> Hi Cheng,
>
> I tried reading the parquet file(on which we were getting the exception)
> through parquet-tools and it is able to dump the file and I can read the
> metadata, etc. I also loaded the file through hive table and can run a
> table scan query on it as well. Let me know if I can do more to help
> resolve the problem, I'll run it through a debugger and see if I can get
> more information on it in the meantime.
>
> Thanks,
> Sadhan
>
> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com> wrote:
>
>>  (Forgot to cc user mail list)
>>
>>
>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>
>> Hey Sadhan,
>>
>>  Thanks for the additional information, this is helpful. Seems that some
>> Parquet internal contract was broken, but I'm not sure whether it's caused
>> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
>> somehow. I'm investigating this. In the meanwhile, would you mind to help
>> to narrow down the problem by trying to scan exactly the same Parquet file
>> with some other systems (e.g. Hive or Impala)? If other systems work, then
>> there must be something wrong with Spark SQL.
>>
>>  Cheng
>>
>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
>> wrote:
>>
>>> Hi Cheng,
>>>
>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>
>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>         ... 26 more
>>>
>>>
>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>>> wrote:
>>>
>>>>  Hi Sadhan,
>>>>
>>>> Could you please provide the stack trace of the
>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>> table to give COUNT(*). In the second case, however, the whole table
>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>> scanned to build the in-memory columnar cache. Then thing went wrong while
>>>> scanning this LZO compressed Parquet file. But unfortunately the stack
>>>> trace at hand doesn’t indicate the root cause.
>>>>
>>>> Cheng
>>>>
>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>
>>>> While testing SparkSQL on a bunch of parquet files (basically used to
>>>> be a partition for one of our hive tables), I encountered this error:
>>>>
>>>>  import org.apache.spark.sql.SchemaRDD
>>>> import org.apache.hadoop.fs.FileSystem;
>>>> import org.apache.hadoop.conf.Configuration;
>>>> import org.apache.hadoop.fs.Path;
>>>>
>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>
>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>> works fine
>>>> sqlContext.cacheTable("xyz_20141109")
>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>> fails with an exception
>>>>
>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>> block -1 in file
>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>
>>>>         at
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>
>>>>         at
>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>
>>>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>
>>>>         at
>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>
>>>>         at
>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>
>>>>         at
>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>>   ​
>>>>
>>>
>>>
>>
>>
>

Re: SparkSQL exception on cached parquet table

Posted by Sadhan Sood <sa...@gmail.com>.
Hi Cheng,

I tried reading the parquet file(on which we were getting the exception)
through parquet-tools and it is able to dump the file and I can read the
metadata, etc. I also loaded the file through hive table and can run a
table scan query on it as well. Let me know if I can do more to help
resolve the problem, I'll run it through a debugger and see if I can get
more information on it in the meantime.

Thanks,
Sadhan

On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <li...@gmail.com> wrote:

>  (Forgot to cc user mail list)
>
>
> On 11/16/14 4:59 PM, Cheng Lian wrote:
>
> Hey Sadhan,
>
>  Thanks for the additional information, this is helpful. Seems that some
> Parquet internal contract was broken, but I'm not sure whether it's caused
> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
> somehow. I'm investigating this. In the meanwhile, would you mind to help
> to narrow down the problem by trying to scan exactly the same Parquet file
> with some other systems (e.g. Hive or Impala)? If other systems work, then
> there must be something wrong with Spark SQL.
>
>  Cheng
>
> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> Hi Cheng,
>>
>>  Thanks for your response. Here is the stack trace from yarn logs:
>>
>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>         at java.util.ArrayList.get(ArrayList.java:431)
>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>         ... 26 more
>>
>>
>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <li...@gmail.com>
>> wrote:
>>
>>>  Hi Sadhan,
>>>
>>> Could you please provide the stack trace of the
>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first query
>>> succeeds is that Spark SQL doesn’t bother reading all data from the table
>>> to give COUNT(*). In the second case, however, the whole table is asked
>>> to be cached lazily via the cacheTable call, thus it’s scanned to build
>>> the in-memory columnar cache. Then thing went wrong while scanning this LZO
>>> compressed Parquet file. But unfortunately the stack trace at hand doesn’t
>>> indicate the root cause.
>>>
>>> Cheng
>>>
>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>
>>> While testing SparkSQL on a bunch of parquet files (basically used to be
>>> a partition for one of our hive tables), I encountered this error:
>>>
>>>  import org.apache.spark.sql.SchemaRDD
>>> import org.apache.hadoop.fs.FileSystem;
>>> import org.apache.hadoop.conf.Configuration;
>>> import org.apache.hadoop.fs.Path;
>>>
>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- works
>>> fine
>>> sqlContext.cacheTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- fails
>>> with an exception
>>>
>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in block
>>> -1 in file
>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>
>>>         at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>
>>>         at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>
>>>         at
>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>
>>>         at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>
>>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>
>>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>
>>>         at
>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>
>>>         at
>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>
>>>         at
>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>
>>>   ​
>>>
>>
>>
>
>

Re: SparkSQL exception on cached parquet table

Posted by Cheng Lian <li...@gmail.com>.
(Forgot to cc user mail list)

On 11/16/14 4:59 PM, Cheng Lian wrote:
> Hey Sadhan,
>
> Thanks for the additional information, this is helpful. Seems that 
> some Parquet internal contract was broken, but I'm not sure whether 
> it's caused by Spark SQL or Parquet, or even maybe the Parquet file 
> itself was damaged somehow. I'm investigating this. In the meanwhile, 
> would you mind to help to narrow down the problem by trying to scan 
> exactly the same Parquet file with some other systems (e.g. Hive or 
> Impala)? If other systems work, then there must be something wrong 
> with Spark SQL.
>
> Cheng
>
> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.sood@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Cheng,
>
>     Thanks for your response. Here is the stack trace from yarn logs:
>
>     Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>              at java.util.ArrayList.elementData(ArrayList.java:418)
>              at java.util.ArrayList.get(ArrayList.java:431)
>              at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>              at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>              at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>              at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>              at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>              at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>              at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>              at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>              at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>              at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>              at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>              ... 26 more
>
>
>     On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs.zju@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Hi Sadhan,
>
>         Could you please provide the stack trace of the
>         |ArrayIndexOutOfBoundsException| (if any)? The reason why the
>         first query succeeds is that Spark SQL doesn’t bother reading
>         all data from the table to give |COUNT(*)|. In the second
>         case, however, the whole table is asked to be cached lazily
>         via the |cacheTable| call, thus it’s scanned to build the
>         in-memory columnar cache. Then thing went wrong while scanning
>         this LZO compressed Parquet file. But unfortunately the stack
>         trace at hand doesn’t indicate the root cause.
>
>         Cheng
>
>         On 11/15/14 5:28 AM, Sadhan Sood wrote:
>
>>         While testing SparkSQL on a bunch of parquet files (basically
>>         used to be a partition for one of our hive tables), I
>>         encountered this error:
>>
>>         import org.apache.spark.sql.SchemaRDD
>>         import org.apache.hadoop.fs.FileSystem;
>>         import org.apache.hadoop.conf.Configuration;
>>         import org.apache.hadoop.fs.Path;
>>
>>         val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>
>>         val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>         parquetFileRDD.registerTempTable("xyz_20141109")
>>         sqlContext.sql("SELECT count(*)  FROM
>>         xyz_20141109").collect() <-- works fine
>>         sqlContext.cacheTable("xyz_20141109")
>>         sqlContext.sql("SELECT count(*)  FROM
>>         xyz_20141109").collect() <-- fails with an exception
>>
>>         parquet.io.ParquetDecodingException: Can not read value at 0
>>         in block -1 in file
>>         hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>
>>                 at
>>         parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>
>>                 at
>>         parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>
>>                 at
>>         org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>
>>                 at
>>         org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>
>>                 at
>>         scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>
>>                 at
>>         scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>
>>                 at
>>         org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>
>>                 at
>>         org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>
>>                 at
>>         org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>
>>                 at
>>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>
>>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>
>>                 at
>>         org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>
>>                 at
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>                 at
>>         org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>
>>                 at
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>                 at
>>         org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>
>>                 at
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>                 at
>>         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>
>>                 at
>>         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>
>>                 at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>
>>                 at
>>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>
>>                 at
>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>>                 at
>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>>                 at java.lang.Thread.run(Thread.java:745)
>>
>>         Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>>
>         ​
>
>
>


Re: SparkSQL exception on cached parquet table

Posted by Cheng Lian <li...@gmail.com>.
Hi Sadhan,

Could you please provide the stack trace of the 
|ArrayIndexOutOfBoundsException| (if any)? The reason why the first 
query succeeds is that Spark SQL doesn’t bother reading all data from 
the table to give |COUNT(*)|. In the second case, however, the whole 
table is asked to be cached lazily via the |cacheTable| call, thus it’s 
scanned to build the in-memory columnar cache. Then thing went wrong 
while scanning this LZO compressed Parquet file. But unfortunately the 
stack trace at hand doesn’t indicate the root cause.

Cheng

On 11/15/14 5:28 AM, Sadhan Sood wrote:

> While testing SparkSQL on a bunch of parquet files (basically used to 
> be a partition for one of our hive tables), I encountered this error:
>
> import org.apache.spark.sql.SchemaRDD
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.registerTempTable("xyz_20141109")
> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- 
> works fine
> sqlContext.cacheTable("xyz_20141109")
> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- 
> fails with an exception
>
> parquet.io.ParquetDecodingException: Can not read value at 0 in block 
> -1 in file 
> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>
>         at 
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>
>         at 
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>
>         at 
> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>
>         at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>
>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>
>         at 
> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>
>         at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>
>         at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>
>         at 
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
​