You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pramod Biligiri <pr...@gmail.com> on 2014/09/23 23:25:43 UTC

Spark Code to read RCFiles

Hi,
I'm trying to read some data in RCFiles using Spark, but can't seem to find
a suitable example anywhere. Currently I've written the following bit of
code that lets me count() the no. of records, but when I try to do a
collect() or a map(), it fails with a ConcurrentModificationException. I'm
running Spark 1.0.1 on a Hadoop YARN cluster:

import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
val file = sc.hadoopFile("/hdfs/path/to/file",
classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]],
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]
)
file.collect()

org.apache.spark.SparkException: Job aborted due to stage failure: Task
10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on
host (redacted): com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
parent (org.apache.spark.repl.ExecutorClassLoader)
classLoader (org.apache.hadoop.mapred.JobConf)
conf (org.apache.hadoop.io.compress.GzipCodec)
codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer)
this$0
(org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl)
lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable)
bytesRefWritables
(org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:722)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Pramod

Re: Spark Code to read RCFiles

Posted by Pramod Biligiri <pr...@gmail.com>.
Hi,
I remember seeing a similar performance problem with Apache Shark last year
when compared to Hive, though that was in a company specific port of the
code. Unfortunately I no longer have access to that code. The problem then
was reflection based class creation in the critical path of reading each
record. Just make sure the code flow for each parse() doesn't do something
like that.

I would look to see if lines like this "getFieldObjectInspector().
asInstanceOf[StringObjectInspector]" are part of the Hive code as well, or
else they look like they'll slow down the parsing if they're being run for
each record.

Pramod

On Fri, Apr 17, 2015 at 12:25 PM, gle <gl...@gmail.com> wrote:

> Hi,
>
> I'm new to Spark and am working on a proof of concept.  I'm using Spark
> 1.3.0 and running in local mode.
>
> I can read and parse an RCFile using Spark however the performance is not
> as
> good as I hoped.
> I'm testing using ~800k rows and it is taking about 30 mins to process.
>
> Is there a better way to load and process a RCFile?  The only reference to
> RCFile in 'Learning Spark' is in the SparkSQL chapter.  Is using SparkSQL
> for RCFiles the recommendation and I should avoid using Spark core
> functionality for RCFiles?
>
> I'm using the following code to build RDD[Record]
>
>     val records: RDD[Record] = sc.hadoopFile(rcFile,
>
> classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]],
>                                                   classOf[LongWritable],
>
> classOf[BytesRefArrayWritable])
>                                                   .map(x =>  (
>                                                      x._1.get, parse( x._2
> )
>                                                     )
>                                                   ).map(pair => pair._2)
> the function parse is defined as:
>
>   def parse(braw: BytesRefArrayWritable ): Record = {
>     val serDe = new ColumnarSerDe()
>     var tbl: Properties = new Properties();
>     tbl.setProperty("serialization.format", "9")
>     tbl.setProperty("columns", "time,id,name,application")
>     tbl.setProperty("columns.types", "string:int:string:string")
>     tbl.setProperty("serialization.null.format", "NULL");
>     serDe.initialize(new Configuration(), tbl);
>
>     val oi = serDe.getObjectInspector()
>     val soi: StructObjectInspector = oi.asInstanceOf[StructObjectInspector]
>     val fieldRefs: Buffer[_ <:StructField]  =
> soi.getAllStructFieldRefs().asScala
>     val row = serDe.deserialize(braw)
>
>     val timeRec = soi.getStructFieldData(row, fieldRefs(0))
>     val idRec = soi.getStructFieldData(row, fieldRefs(1))
>     val nameRec = soi.getStructFieldData(row, fieldRefs(2))
>     val applicationRec = soi.getStructFieldData(row, fieldRefs(3))
>
>     var timeOI =
> fieldRefs(0).getFieldObjectInspector().asInstanceOf[StringObjectInspector];
>     var time = timeOI.getPrimitiveJavaObject(timeRec);
>     var idOI =
> fieldRefs(1).getFieldObjectInspector().asInstanceOf[IntObjectInspector];
>     var id = idOI.get(idRec);
>     var nameOI =
> fieldRefs(2).getFieldObjectInspector().asInstanceOf[StringObjectInspector];
>     var name = nameOI.getPrimitiveJavaObject(nameRec);
>     var appOI =
> fieldRefs(3).getFieldObjectInspector().asInstanceOf[StringObjectInspector];
>     var app = appOI.getPrimitiveJavaObject(applicationRec);
>
>     new Record(time, id, name, app)
>   }
>
>
> Thanks in advance,
> Glenda
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Code-to-read-RCFiles-tp14934p22545.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark Code to read RCFiles

Posted by gle <gl...@gmail.com>.
Hi,

I'm new to Spark and am working on a proof of concept.  I'm using Spark
1.3.0 and running in local mode.

I can read and parse an RCFile using Spark however the performance is not as
good as I hoped. 
I'm testing using ~800k rows and it is taking about 30 mins to process.

Is there a better way to load and process a RCFile?  The only reference to
RCFile in 'Learning Spark' is in the SparkSQL chapter.  Is using SparkSQL
for RCFiles the recommendation and I should avoid using Spark core
functionality for RCFiles?

I'm using the following code to build RDD[Record]

    val records: RDD[Record] = sc.hadoopFile(rcFile,
                                                 
classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]],
                                                  classOf[LongWritable],
                                                 
classOf[BytesRefArrayWritable])
                                                  .map(x =>  (
                                                     x._1.get, parse( x._2 )
                                                    )
                                                  ).map(pair => pair._2)
the function parse is defined as:

  def parse(braw: BytesRefArrayWritable ): Record = {  
    val serDe = new ColumnarSerDe()
    var tbl: Properties = new Properties();
    tbl.setProperty("serialization.format", "9")       
    tbl.setProperty("columns", "time,id,name,application")
    tbl.setProperty("columns.types", "string:int:string:string")
    tbl.setProperty("serialization.null.format", "NULL");
    serDe.initialize(new Configuration(), tbl);
     
    val oi = serDe.getObjectInspector()
    val soi: StructObjectInspector = oi.asInstanceOf[StructObjectInspector]
    val fieldRefs: Buffer[_ <:StructField]  =
soi.getAllStructFieldRefs().asScala
    val row = serDe.deserialize(braw)     
   
    val timeRec = soi.getStructFieldData(row, fieldRefs(0)) 
    val idRec = soi.getStructFieldData(row, fieldRefs(1))
    val nameRec = soi.getStructFieldData(row, fieldRefs(2))
    val applicationRec = soi.getStructFieldData(row, fieldRefs(3))
   
    var timeOI =
fieldRefs(0).getFieldObjectInspector().asInstanceOf[StringObjectInspector];
    var time = timeOI.getPrimitiveJavaObject(timeRec);                         
    var idOI =
fieldRefs(1).getFieldObjectInspector().asInstanceOf[IntObjectInspector];                                              
    var id = idOI.get(idRec);
    var nameOI =
fieldRefs(2).getFieldObjectInspector().asInstanceOf[StringObjectInspector];                                              
    var name = nameOI.getPrimitiveJavaObject(nameRec);
    var appOI =
fieldRefs(3).getFieldObjectInspector().asInstanceOf[StringObjectInspector];                                              
    var app = appOI.getPrimitiveJavaObject(applicationRec);
   
    new Record(time, id, name, app)
  }


Thanks in advance,
Glenda



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Code-to-read-RCFiles-tp14934p22545.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Code to read RCFiles

Posted by cem <ca...@gmail.com>.
I used the following code as an example to deserialize
BytesRefArrayWritable.

http://www.massapi.com/source/hive-0.5.0-dev/src/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java.html

Best Regards,
Cem.



On Wed, Sep 24, 2014 at 1:34 PM, Pramod Biligiri <pr...@gmail.com>
wrote:

> I'm afraid SparkSQL isn't an option for my use case, so I need to use the
> Spark API itself.
> I turned off Kryo, and I'm getting a NullPointerException now:
>
> scala> val ref = file.take(1)(0)._2
> ref: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable =
> org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@9753e
>
> scala> ref.size
> res7: Int = 79  // *This matches the no. of columns that I know exist in
> that RC record*
>
> scala> ref.get(0)
> java.lang.NullPointerException
> at
> org.apache.hadoop.hive.serde2.columnar.BytesRefWritable.toString(BytesRefWritable.java:194)
> at
> scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
> at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
> at .<init>(<console>:10)
> at .<clinit>(<console>)
> at $print(<console>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
>
> Pramod
>
> On Wed, Sep 24, 2014 at 7:38 AM, cem <ca...@gmail.com> wrote:
>
>>
>> I was able to read RC files with the following line:
>>
>>
>> val file: RDD[(LongWritable, BytesRefArrayWritable)] =
>> sc.hadoopFile("hdfs://****day=2014-08-10/hour=00/",
>> classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]],
>> classOf[LongWritable], classOf[BytesRefArrayWritable],500)
>>
>> Try with disabling  kryo serializer.
>>
>> Best Regards,
>> Cem Cayiroglu
>>
>> On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>>
>>> Is your file managed by Hive (and thus present in a Hive metastore)? In
>>> that case, Spark SQL (
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html) is the
>>> easiest way.
>>>
>>> Matei
>>>
>>> On September 23, 2014 at 2:26:10 PM, Pramod Biligiri (
>>> pramodbiligiri@gmail.com) wrote:
>>>
>>> Hi,
>>> I'm trying to read some data in RCFiles using Spark, but can't seem to
>>> find a suitable example anywhere. Currently I've written the following bit
>>> of code that lets me count() the no. of records, but when I try to do a
>>> collect() or a map(), it fails with a ConcurrentModificationException. I'm
>>> running Spark 1.0.1 on a Hadoop YARN cluster:
>>>
>>>  import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
>>> val file = sc.hadoopFile("/hdfs/path/to/file",
>>>
>>> classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]],
>>> classOf[org.apache.hadoop.io.LongWritable],
>>>  classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]
>>> )
>>>  file.collect()
>>>
>>>  org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 10.0:6 failed 4 times, most recent failure: Exception failure in TID
>>> 395 on host (redacted): com.esotericsoftware.kryo.KryoException:
>>> java.util.ConcurrentModificationException
>>> Serialization trace:
>>> classes (sun.misc.Launcher$AppClassLoader)
>>> parent (org.apache.spark.repl.ExecutorClassLoader)
>>> classLoader (org.apache.hadoop.mapred.JobConf)
>>> conf (org.apache.hadoop.io.compress.GzipCodec)
>>> codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer)
>>> this$0
>>> (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl)
>>> lazyDecompressObj
>>> (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable)
>>> bytesRefWritables
>>> (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
>>>
>>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
>>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>
>>> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)
>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         java.lang.Thread.run(Thread.java:722)
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Pramod
>>>
>>>
>>>
>>
>

Re: Spark Code to read RCFiles

Posted by Pramod Biligiri <pr...@gmail.com>.
I'm afraid SparkSQL isn't an option for my use case, so I need to use the
Spark API itself.
I turned off Kryo, and I'm getting a NullPointerException now:

scala> val ref = file.take(1)(0)._2
ref: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable =
org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@9753e

scala> ref.size
res7: Int = 79  // *This matches the no. of columns that I know exist in
that RC record*

scala> ref.get(0)
java.lang.NullPointerException
at
org.apache.hadoop.hive.serde2.columnar.BytesRefWritable.toString(BytesRefWritable.java:194)
at
scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)

Pramod

On Wed, Sep 24, 2014 at 7:38 AM, cem <ca...@gmail.com> wrote:

>
> I was able to read RC files with the following line:
>
>
> val file: RDD[(LongWritable, BytesRefArrayWritable)] =
> sc.hadoopFile("hdfs://****day=2014-08-10/hour=00/",
> classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]],
> classOf[LongWritable], classOf[BytesRefArrayWritable],500)
>
> Try with disabling  kryo serializer.
>
> Best Regards,
> Cem Cayiroglu
>
> On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
>
>> Is your file managed by Hive (and thus present in a Hive metastore)? In
>> that case, Spark SQL (
>> https://spark.apache.org/docs/latest/sql-programming-guide.html) is the
>> easiest way.
>>
>> Matei
>>
>> On September 23, 2014 at 2:26:10 PM, Pramod Biligiri (
>> pramodbiligiri@gmail.com) wrote:
>>
>> Hi,
>> I'm trying to read some data in RCFiles using Spark, but can't seem to
>> find a suitable example anywhere. Currently I've written the following bit
>> of code that lets me count() the no. of records, but when I try to do a
>> collect() or a map(), it fails with a ConcurrentModificationException. I'm
>> running Spark 1.0.1 on a Hadoop YARN cluster:
>>
>>  import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
>> val file = sc.hadoopFile("/hdfs/path/to/file",
>>
>> classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]],
>> classOf[org.apache.hadoop.io.LongWritable],
>>  classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]
>> )
>>  file.collect()
>>
>>  org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on
>> host (redacted): com.esotericsoftware.kryo.KryoException:
>> java.util.ConcurrentModificationException
>> Serialization trace:
>> classes (sun.misc.Launcher$AppClassLoader)
>> parent (org.apache.spark.repl.ExecutorClassLoader)
>> classLoader (org.apache.hadoop.mapred.JobConf)
>> conf (org.apache.hadoop.io.compress.GzipCodec)
>> codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer)
>> this$0
>> (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl)
>> lazyDecompressObj
>> (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable)
>> bytesRefWritables
>> (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
>>
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         java.lang.Thread.run(Thread.java:722)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Pramod
>>
>>
>>
>

Re: Spark Code to read RCFiles

Posted by cem <ca...@gmail.com>.
I was able to read RC files with the following line:


val file: RDD[(LongWritable, BytesRefArrayWritable)] =
sc.hadoopFile("hdfs://****day=2014-08-10/hour=00/",
classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]],
classOf[LongWritable], classOf[BytesRefArrayWritable],500)

Try with disabling  kryo serializer.

Best Regards,
Cem Cayiroglu

On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> Is your file managed by Hive (and thus present in a Hive metastore)? In
> that case, Spark SQL (
> https://spark.apache.org/docs/latest/sql-programming-guide.html) is the
> easiest way.
>
> Matei
>
> On September 23, 2014 at 2:26:10 PM, Pramod Biligiri (
> pramodbiligiri@gmail.com) wrote:
>
> Hi,
> I'm trying to read some data in RCFiles using Spark, but can't seem to
> find a suitable example anywhere. Currently I've written the following bit
> of code that lets me count() the no. of records, but when I try to do a
> collect() or a map(), it fails with a ConcurrentModificationException. I'm
> running Spark 1.0.1 on a Hadoop YARN cluster:
>
>  import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
> val file = sc.hadoopFile("/hdfs/path/to/file",
>
> classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]],
> classOf[org.apache.hadoop.io.LongWritable],
>  classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]
> )
>  file.collect()
>
>  org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on
> host (redacted): com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> parent (org.apache.spark.repl.ExecutorClassLoader)
> classLoader (org.apache.hadoop.mapred.JobConf)
> conf (org.apache.hadoop.io.compress.GzipCodec)
> codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer)
> this$0
> (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl)
> lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable)
> bytesRefWritables
> (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>         com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
>         com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:722)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Pramod
>
>
>

Re: Spark Code to read RCFiles

Posted by Matei Zaharia <ma...@gmail.com>.
Is your file managed by Hive (and thus present in a Hive metastore)? In that case, Spark SQL (https://spark.apache.org/docs/latest/sql-programming-guide.html) is the easiest way.

Matei

On September 23, 2014 at 2:26:10 PM, Pramod Biligiri (pramodbiligiri@gmail.com) wrote:

Hi,
I'm trying to read some data in RCFiles using Spark, but can't seem to find a suitable example anywhere. Currently I've written the following bit of code that lets me count() the no. of records, but when I try to do a collect() or a map(), it fails with a ConcurrentModificationException. I'm running Spark 1.0.1 on a Hadoop YARN cluster:

import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
val file = sc.hadoopFile("/hdfs/path/to/file",
classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]],
classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]
)
file.collect()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on host (redacted): com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
parent (org.apache.spark.repl.ExecutorClassLoader)
classLoader (org.apache.hadoop.mapred.JobConf)
conf (org.apache.hadoop.io.compress.GzipCodec)
codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer)
this$0 (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl)
lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable)
bytesRefWritables (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:722)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Pramod