You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/05/30 13:40:17 UTC

[jira] [Commented] (SPARK-7960) Serialization problem when multiple receivers are specified in a loop

    [ https://issues.apache.org/jira/browse/SPARK-7960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565956#comment-14565956 ] 

Sean Owen commented on SPARK-7960:
----------------------------------

Yeah I've seen problems of this form, where the closure captures things you don't expect (or shouldn't) when code is written one way versus another. This might be resolved by various updates to the closure cleaner recently, so I think this is probably a duplicate of other issues rather than special to streaming.

> Serialization problem when multiple receivers are specified in a loop
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7960
>                 URL: https://issues.apache.org/jira/browse/SPARK-7960
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Nishkam Ravi
>
> The following code works fine:
>      var lines_array:Array[ReceiverInputDStream[String]] = new Array[ReceiverInputDStream[String]](4);
>      lines_array(0) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt))), "SampleReceiver0")
>      lines_array(1) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+1))), "SampleReceiver1")
>      lines_array(2) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+2))), "SampleReceiver2")
>      lines_array(3) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+3))), "SampleReceiver3")
> Fails when specified as a loop:
>      var lines_array:Array[ReceiverInputDStream[String]] = new Array[ReceiverInputDStream[String]](4);
>      var i = 0;
>      for(i <- 0 to 3){
>        lines_array(i) = ssc.actorStream[String](
>          Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>            host, port.toInt + i))), "SampleReceiver" + i.toString)
>      }
> Exception stack trace:
> java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:421)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
>         at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:408)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org