You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "mohan gaddam (JIRA)" <ji...@apache.org> on 2014/09/19 09:20:34 UTC
[jira] [Created] (SPARK-3601) Kryo NPE for output operations on
Avro complex Objects even after registering.
mohan gaddam created SPARK-3601:
-----------------------------------
Summary: Kryo NPE for output operations on Avro complex Objects even after registering.
Key: SPARK-3601
URL: https://issues.apache.org/jira/browse/SPARK-3601
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 1.0.0
Environment: local, standalone cluster
Reporter: mohan gaddam
Kryo serializer works well when avro objects has simple data. but when the same avro object has complex data(like unions/arrays) kryo fails while output operations. but mappings are good.
Note that i have registered all the Avro generated classes with kryo.
im using Java as programming language.
when used complex message throws NPE, stack trace as follows:
====================================
ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
value (xyz.Datum)
data (xyz.ResourceMessage)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
In the above exception, Datum and ResourceMessage are project specific classes generated by avro using the below avdl snippet:
======================
record KeyValueObject {
union{boolean, int, long, float, double, bytes, string} name;
union {boolean, int, long, float, double, bytes, string, array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, KeyValueObject} value;
}
record Datum {
union {boolean, int, long, float, double, bytes, string, array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, KeyValueObject} value;
}
record ResourceMessage {
string version;
string sequence;
string resourceGUID;
string GWID;
string GWTimestamp;
union {Datum, array<Datum>} data;
}
avro message samples are as follows:
============================
1)
{"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", "GWTimestamp": "1409823150737", "data": {"value": "30"}}
2)
{"version": "01", "sequence": "00001", "resource": "sensor-001", "controller": "002", "controllerTimestamp": "1411038710358", "data": {"value": [{"name": "Temperature", "value": "30"}, {"name": "Speed", "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
both 1 and 2 adhere to the avro schema, so decoder is able to convert them into avro objects in spark streaming api.
By the way the messages were pulled from kafka source, and decoded by using kafka decoder.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org