You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:25:17 UTC

[jira] [Updated] (SPARK-7960) Serialization problem when multiple receivers are specified in a loop

     [ https://issues.apache.org/jira/browse/SPARK-7960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-7960:
--------------------------------
    Labels: bulk-closed  (was: )

> Serialization problem when multiple receivers are specified in a loop
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7960
>                 URL: https://issues.apache.org/jira/browse/SPARK-7960
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Nishkam Ravi
>            Priority: Major
>              Labels: bulk-closed
>
> The following code works fine:
>      var lines_array:Array[ReceiverInputDStream[String]] = new Array[ReceiverInputDStream[String]](4);
>      lines_array(0) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt))), "SampleReceiver0")
>      lines_array(1) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+1))), "SampleReceiver1")
>      lines_array(2) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+2))), "SampleReceiver2")
>      lines_array(3) = ssc.actorStream[String](
>        Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+3))), "SampleReceiver3")
> Fails when specified as a loop:
>      var lines_array:Array[ReceiverInputDStream[String]] = new Array[ReceiverInputDStream[String]](4);
>      var i = 0;
>      for(i <- 0 to 3){
>        lines_array(i) = ssc.actorStream[String](
>          Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>            host, port.toInt + i))), "SampleReceiver" + i.toString)
>      }
> Exception stack trace:
> java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:421)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
>         at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:408)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org