You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Paolo Patierno <pp...@live.com> on 2016/06/29 08:19:04 UTC

Job aborted due to not serializable exception

Hi, 
following the socketStream[T] function implementation from the official spark GitHub repo :

ef socketStream[T](
      
      
              hostname: String,
      
      
              port: Int,
      
      
              converter: JFunction[InputStream, java.lang.Iterable[T]],
      
      
              storageLevel: StorageLevel)
      
      
          : JavaReceiverInputDStream[T] = {
      
      
            def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).iterator().asScala
      
      
            implicit val cmt: ClassTag[T] =
      
      
              implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
      
      
            ssc.socketStream(hostname, port, fn, storageLevel)
      
      
          }

I'm implementing a custom receiver that works great with used in Scala.
I'm trying to use it from Java and the createStream in MyReceiverUtils.scala is the following :

def createStream[T](
      jssc: JavaStreamingContext,
      host: String,
      port: Int,
      address: String,
      messageConverter: Function[Message, Option[T]],
      storageLevel: StorageLevel
    ): JavaReceiverInputDStream[T] = {


    def fn: (Message) => Option[T] = (x: Message) => messageConverter.call(x)
    implicit val cmt: ClassTag[T] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
    new MyInputDStream(jssc.ssc, host, port, address, fn, storageLevel)
  }

Trying to use it I receive :

org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 465, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.spark.streaming.amqp.JavaMyReceiverStreamSuite

If I change the fn definition with something simpler like (x: Message) => None for example, the error goes away. 

Why the call on messageConverter is producing this problem ?

Thanks,
Paolo