You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arbi Akhina <ar...@gmail.com> on 2015/07/23 18:02:08 UTC

ERROR TaskResultGetter: Exception while getting task result when reading avro files that contain arrays

Hi,

I'm trying to read an avro file into a spark RDD, but I'm having
an Exception while getting task result.

The avro schema file has the following content:
{
  "type" : "record",
  "name" : "sample_schema",
  "namespace" : "com.adomik.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account"
  }, {
    "name" : "events",
    "type" : {
      "type" : "array",
      "items" : {
        "name":"Event",
        "type":"record",
        "fields":[
          {"name":"action", "type":"string"}, {"name":"value",
"type":"long"}
        ]
      }
    },
    "doc"  : "The content of the user's Events message"
  } ],
  "doc:" : "A basic schema for storing Events messages"
}

I create the avro file using avro-tools.jar file from the following json
file:
{"username":"miguno","events": [{"action":"signed", "value": 1}, {"action":
"loged", "value":1}] }
{"username":"blizzard","events": [{"action":"logout", "value": 2},
{"action": "visited", "value":3}] }

$ java -jar avro-tools-1.7.7.jar fromjson --schema-file myschema.avsc
data.json > data.avro

I can correctly read the generated avro file with the avro-tools.jar as
follows:
$ java -jar avro-tools-1.7.7.jar tojson data.avro

However I'm having an exception when I try to read this generated avro file
into a Spark RDD from spark shell as follows:

> import org.apache.avro.mapred.AvroInputFormat
> import org.apache.avro.mapred.AvroWrapper
> import org.apache.hadoop.io.NullWritable
> import org.apache.hadoop.io.Text
> import org.apache.avro.generic.GenericRecord

> val input = "/home/arbi/avro/data.avro"
> val rdd = sc.hadoopFile(
  input,
  classOf[AvroInputFormat[GenericRecord]],
  classOf[AvroWrapper[GenericRecord]],
  classOf[NullWritable]
)

Then when I call rdd.next, I see the following exception:

15/07/23 14:30:48 ERROR TaskResultGetter: Exception while getting task
result

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

values (org.apache.avro.generic.GenericData$Record)

datum (org.apache.avro.mapred.AvroWrapper)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)

at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)

at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)

at
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)

at
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)

at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)

at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)

at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)

at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NullPointerException

at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)

at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)

at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)

... 23 more

org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

values (org.apache.avro.generic.GenericData$Record)

datum (org.apache.avro.mapred.AvroWrapper)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Any idea what's causing this error? is the presence of arrays in avro
causes problem when generating spark RDDs??

Bests,

Re: ERROR TaskResultGetter: Exception while getting task result when reading avro files that contain arrays

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Its a serialization error with nested schema i guess. You can look at the
twitters chill avro serializer library. Here's two discussion on the same:

- https://issues.apache.org/jira/browse/SPARK-3447
-
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-td14549.html

Thanks
Best Regards

On Thu, Jul 23, 2015 at 9:32 PM, Arbi Akhina <ar...@gmail.com> wrote:

> Hi,
>
> I'm trying to read an avro file into a spark RDD, but I'm having
> an Exception while getting task result.
>
> The avro schema file has the following content:
> {
>   "type" : "record",
>   "name" : "sample_schema",
>   "namespace" : "com.adomik.avro",
>   "fields" : [ {
>     "name" : "username",
>     "type" : "string",
>     "doc"  : "Name of the user account"
>   }, {
>     "name" : "events",
>     "type" : {
>       "type" : "array",
>       "items" : {
>         "name":"Event",
>         "type":"record",
>         "fields":[
>           {"name":"action", "type":"string"}, {"name":"value",
> "type":"long"}
>         ]
>       }
>     },
>     "doc"  : "The content of the user's Events message"
>   } ],
>   "doc:" : "A basic schema for storing Events messages"
> }
>
> I create the avro file using avro-tools.jar file from the following json
> file:
> {"username":"miguno","events": [{"action":"signed", "value": 1},
> {"action": "loged", "value":1}] }
> {"username":"blizzard","events": [{"action":"logout", "value": 2},
> {"action": "visited", "value":3}] }
>
> $ java -jar avro-tools-1.7.7.jar fromjson --schema-file myschema.avsc
> data.json > data.avro
>
> I can correctly read the generated avro file with the avro-tools.jar as
> follows:
> $ java -jar avro-tools-1.7.7.jar tojson data.avro
>
> However I'm having an exception when I try to read this generated avro
> file into a Spark RDD from spark shell as follows:
>
> > import org.apache.avro.mapred.AvroInputFormat
> > import org.apache.avro.mapred.AvroWrapper
> > import org.apache.hadoop.io.NullWritable
> > import org.apache.hadoop.io.Text
> > import org.apache.avro.generic.GenericRecord
>
> > val input = "/home/arbi/avro/data.avro"
> > val rdd = sc.hadoopFile(
>   input,
>   classOf[AvroInputFormat[GenericRecord]],
>   classOf[AvroWrapper[GenericRecord]],
>   classOf[NullWritable]
> )
>
> Then when I call rdd.next, I see the following exception:
>
> 15/07/23 14:30:48 ERROR TaskResultGetter: Exception while getting task
> result
>
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>
> Serialization trace:
>
> values (org.apache.avro.generic.GenericData$Record)
>
> datum (org.apache.avro.mapred.AvroWrapper)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>
> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>
> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>
> at
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
>
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>
> ... 23 more
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>
> Serialization trace:
>
> values (org.apache.avro.generic.GenericData$Record)
>
> datum (org.apache.avro.mapred.AvroWrapper)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
> Any idea what's causing this error? is the presence of arrays in avro
> causes problem when generating spark RDDs??
>
> Bests,
>