You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hari Shreedharan (JIRA)" <ji...@apache.org> on 2014/05/11 00:00:59 UTC

[jira] [Created] (SPARK-1785) Streaming requires receivers to be serializable

Hari Shreedharan created SPARK-1785:
---------------------------------------

             Summary: Streaming requires receivers to be serializable
                 Key: SPARK-1785
                 URL: https://issues.apache.org/jira/browse/SPARK-1785
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 0.9.0
            Reporter: Hari Shreedharan


When the ReceiverTracker starts the receivers it creates a temporary RDD to  send the receivers over to the workers. Then they are started on the workers  using a the startReceivers method.

Looks like this means that the receivers have to really be serializable. In case of the Flume receiver, the Avro IPC components are not serializable causing an error that looks like this:
{code}
Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.avro.ipc.specific.SpecificResponder
	- field (class "org.apache.spark.streaming.flume.FlumeReceiver", name: "responder", type: "class org.apache.avro.ipc.specific.SpecificResponder")
	- object (class "org.apache.spark.streaming.flume.FlumeReceiver", org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)
	- element of array (index: 0)
	- array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: 1)
	- field (class "scala.collection.mutable.WrappedArray$ofRef", name: "array", type: "class [Ljava.lang.Object;")
	- object (class "scala.collection.mutable.WrappedArray$ofRef", WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36))
	- field (class "org.apache.spark.rdd.ParallelCollectionPartition", name: "values", type: "interface scala.collection.Seq")
	- custom writeObject data (class "org.apache.spark.rdd.ParallelCollectionPartition")
	- object (class "org.apache.spark.rdd.ParallelCollectionPartition", org.apache.spark.rdd.ParallelCollectionPartition@691)
	- writeExternal data
	- root object (class "org.apache.spark.scheduler.ResultTask", ResultTask(0, 0))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

A way out of this is to simply send the class name (or .class) to the workers in the tempRDD and have the workers instantiate and start the receiver.

My analysis maybe wrong. but if it makes sense, I will submit a PR to fix this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)