You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 05:35:24 UTC

[jira] [Updated] (SPARK-3601) Kryo NPE for output operations on Avro complex Objects even after registering.

     [ https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-3601:
--------------------------------
    Labels: bulk-closed  (was: )

> Kryo NPE for output operations on Avro complex Objects even after registering.
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-3601
>                 URL: https://issues.apache.org/jira/browse/SPARK-3601
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: local, standalone cluster
>            Reporter: mohan gaddam
>            Priority: Major
>              Labels: bulk-closed
>
> Kryo serializer works well when avro objects has simple data. but when the same avro object has complex data(like unions/arrays) kryo fails while output operations. but mappings are good. Note that i have registered all the Avro generated classes with kryo. Im using Java as programming language.
> when used complex message throws NPE, stack trace as follows:
> ==================================================
> ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 ms.0 
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException 
> Serialization trace: 
> value (xyz.Datum) 
> data (xyz.ResMsg) 
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
> at scala.Option.foreach(Option.scala:236) 
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
> at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
> at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
> In the above exception, Datum and ResMsg are project specific classes generated by avro using the below avdl snippet:
> ======================
> record KeyValueObject { 
>         union{boolean, int, long, float, double, bytes, string} name; 
>         union {boolean, int, long, float, double, bytes, string, array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, KeyValueObject} value; 
> } 
> record Datum { 
>         union {boolean, int, long, float, double, bytes, string, array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, KeyValueObject} value; 
> } 
> record ResMsg { 
>                 string version; 
>                 string sequence; 
>                 string resourceGUID; 
>                 string GWID; 
>                 string GWTimestamp; 
>                 union {Datum, array<Datum>} data; 
> }
> avro message samples are as follows:
> ============================
> 1)
> {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", "GWTimestamp": "1409823150737", "data": {"value": "30"}} 
> 2)
> {"version": "01", "sequence": "00001", "resource": "sensor-001", "controller": "002", "controllerTimestamp": "1411038710358", "data": {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed", "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
> both 1 and 2 adhere to the avro schema, so decoder is able to convert them into avro objects in spark streaming api.
> BTW the messages were pulled from kafka source, and decoded by using kafka decoder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org