You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Paolo Patierno <pp...@live.com> on 2016/06/29 08:19:04 UTC
Job aborted due to not serializable exception
Hi,
following the socketStream[T] function implementation from the official spark GitHub repo :
ef socketStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaReceiverInputDStream[T] = {
def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).iterator().asScala
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
}
I'm implementing a custom receiver that works great with used in Scala.
I'm trying to use it from Java and the createStream in MyReceiverUtils.scala is the following :
def createStream[T](
jssc: JavaStreamingContext,
host: String,
port: Int,
address: String,
messageConverter: Function[Message, Option[T]],
storageLevel: StorageLevel
): JavaReceiverInputDStream[T] = {
def fn: (Message) => Option[T] = (x: Message) => messageConverter.call(x)
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new MyInputDStream(jssc.ssc, host, port, address, fn, storageLevel)
}
Trying to use it I receive :
org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 465, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.spark.streaming.amqp.JavaMyReceiverStreamSuite
If I change the fn definition with something simpler like (x: Message) => None for example, the error goes away.
Why the call on messageConverter is producing this problem ?
Thanks,
Paolo