You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2014/07/15 18:21:39 UTC

Re: NotSerializableException in Spark Streaming

Hey Diana,

Did you ever figure this out?

I’m running into the same exception, except in my case the function I’m
calling is a KMeans model.predict().

In regular Spark it works, and Spark Streaming without the call to
model.predict() also works, but when put together I get this serialization
exception. I’m on 1.0.0.

Nick
​


On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com> wrote:

> Hey all, trying to set up a pretty simple streaming app and getting some
> weird behavior.
>
> First, a non-streaming job that works fine:  I'm trying to pull out lines
> of a log file that match a regex, for which I've set up a function:
>
> def getRequestDoc(s: String):
>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
> logs=sc.textFile(logfiles)
> logs.map(getRequestDoc).take(10)
>
> That works, but I want to run that on the same data, but streaming, so I
> tried this:
>
> val logs = ssc.socketTextStream("localhost",4444)
> logs.map(getRequestDoc).print()
> ssc.start()
>
> From this code, I get:
> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
> 1399545128000 ms.0
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>
>
> But if I do the map function inline instead of calling a separate
> function, it works:
>
> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>
> So why is it able to serialize my little function in regular spark, but
> not in streaming?
>
> Thanks,
> Diana
>
>
>

Re: NotSerializableException in Spark Streaming

Posted by Nicholas Chammas <ni...@gmail.com>.
This still seems to be broken. In 1.1.1, it errors immediately on this line
(from the above repro script):

liveTweets.map(t => noop(t)).print()

The stack trace is:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
    at $iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC.<init>(<console>:34)
    at $iwC.<init>(<console>:36)
    at <init>(<console>:38)
    at .<init>(<console>:42)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 43 more

TD, any troubleshooting tips?

Nick
​

On Thu Jul 24 2014 at 8:55:07 PM Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> Yep, here goes!
>
> Here are my environment vitals:
>
>    - Spark 1.0.0
>    - EC2 cluster with 1 slave spun up using spark-ec2
>    - twitter4j 3.0.3
>    - spark-shell called with --jars argument to load
>    spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
>    jars.
>
> Now, while I’m in the Spark shell, I enter the following:
>
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken")
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
>
> }
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
>
> liveTweets.map(t => noop(t)).print()
>
> ssc.start()
>
> So basically, I’m just printing Tweets as-is, but first I’m mapping them
> to themselves via noop(). The Tweets will start to flow just fine for a
> minute or so, and then, this:
>
> 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 1406243610000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The time-to-first-error is variable.
>
> This is the simplest repro I can show at this time. Doing more complex
> things with liveTweets that involve a KMeansModel, for example, will be
> interrupted quicker by this java.io.NotSerializableException. I don’t
> know if the root cause is the same, but the error certainly is.
>
> By the way, trying to reproduce this on 1.0.1 doesn’t raise the same
> error, but I can’t dig deeper to make sure this is really resolved (e.g. by
> trying more complex things that need data) due to SPARK-2471
> <https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue
> has been resolved, so I’ll try this whole process again using the latest
> from master and see how it goes.
>
> Nick
>
>
> On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> I am very curious though. Can you post a concise code example which we can
>> run to reproduce this problem?
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> I am not entire sure off the top of my head. But a possible (usually
>>> works) workaround is to define the function as a val instead of a def. For
>>> example
>>>
>>> def func(i: Int): Boolean = { true }
>>>
>>> can be written as
>>>
>>> val func = (i: Int) => { true }
>>>
>>> Hope this helps for now.
>>>
>>> TD
>>>
>>>
>>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> Hey Diana,
>>>>
>>>> Did you ever figure this out?
>>>>
>>>> I’m running into the same exception, except in my case the function I’m
>>>> calling is a KMeans model.predict().
>>>>
>>>> In regular Spark it works, and Spark Streaming without the call to
>>>> model.predict() also works, but when put together I get this
>>>> serialization exception. I’m on 1.0.0.
>>>>
>>>> Nick
>>>> ​
>>>>
>>>>
>>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Hey all, trying to set up a pretty simple streaming app and getting
>>>>> some weird behavior.
>>>>>
>>>>>  First, a non-streaming job that works fine:  I'm trying to pull out
>>>>> lines of a log file that match a regex, for which I've set up a function:
>>>>>
>>>>> def getRequestDoc(s: String):
>>>>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>>>> logs=sc.textFile(logfiles)
>>>>> logs.map(getRequestDoc).take(10)
>>>>>
>>>>> That works, but I want to run that on the same data, but streaming, so
>>>>> I tried this:
>>>>>
>>>>> val logs = ssc.socketTextStream("localhost",4444)
>>>>> logs.map(getRequestDoc).print()
>>>>> ssc.start()
>>>>>
>>>>> From this code, I get:
>>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>>>> 1399545128000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>>> java.io.NotSerializableException:
>>>>> org.apache.spark.streaming.StreamingContext
>>>>>
>>>>>
>>>>> But if I do the map function inline instead of calling a separate
>>>>> function, it works:
>>>>>
>>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>>>
>>>>> So why is it able to serialize my little function in regular spark,
>>>>> but not in streaming?
>>>>>
>>>>> Thanks,
>>>>> Diana
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>  ​
>

Re: NotSerializableException in Spark Streaming

Posted by Nicholas Chammas <ni...@gmail.com>.
Yep, here goes!

Here are my environment vitals:

   - Spark 1.0.0
   - EC2 cluster with 1 slave spun up using spark-ec2
   - twitter4j 3.0.3
   - spark-shell called with --jars argument to load
   spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
   jars.

Now, while I’m in the Spark shell, I enter the following:

import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
def getAuth(): Option[Authorization] = {

  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
  System.setProperty("twitter4j.oauth.accessToken", "accessToken")
  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

  Some(new OAuthAuthorization(new ConfigurationBuilder().build()))

}
def noop(a: Any): Any = {
  a
}
val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)

liveTweets.map(t => noop(t)).print()

ssc.start()

So basically, I’m just printing Tweets as-is, but first I’m mapping them to
themselves via noop(). The Tweets will start to flow just fine for a minute
or so, and then, this:

14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job
1406243610000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
    at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
    at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The time-to-first-error is variable.

This is the simplest repro I can show at this time. Doing more complex
things with liveTweets that involve a KMeansModel, for example, will be
interrupted quicker by this java.io.NotSerializableException. I don’t know
if the root cause is the same, but the error certainly is.

By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error,
but I can’t dig deeper to make sure this is really resolved (e.g. by trying
more complex things that need data) due to SPARK-2471
<https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue
has been resolved, so I’ll try this whole process again using the latest
from master and see how it goes.

Nick


On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <ta...@gmail.com>
wrote:

I am very curious though. Can you post a concise code example which we can
> run to reproduce this problem?
>
> TD
>
>
> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> I am not entire sure off the top of my head. But a possible (usually
>> works) workaround is to define the function as a val instead of a def. For
>> example
>>
>> def func(i: Int): Boolean = { true }
>>
>> can be written as
>>
>> val func = (i: Int) => { true }
>>
>> Hope this helps for now.
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> Hey Diana,
>>>
>>> Did you ever figure this out?
>>>
>>> I’m running into the same exception, except in my case the function I’m
>>> calling is a KMeans model.predict().
>>>
>>> In regular Spark it works, and Spark Streaming without the call to
>>> model.predict() also works, but when put together I get this
>>> serialization exception. I’m on 1.0.0.
>>>
>>> Nick
>>> ​
>>>
>>>
>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
>>> wrote:
>>>
>>>> Hey all, trying to set up a pretty simple streaming app and getting
>>>> some weird behavior.
>>>>
>>>>  First, a non-streaming job that works fine:  I'm trying to pull out
>>>> lines of a log file that match a regex, for which I've set up a function:
>>>>
>>>> def getRequestDoc(s: String):
>>>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>>> logs=sc.textFile(logfiles)
>>>> logs.map(getRequestDoc).take(10)
>>>>
>>>> That works, but I want to run that on the same data, but streaming, so
>>>> I tried this:
>>>>
>>>> val logs = ssc.socketTextStream("localhost",4444)
>>>> logs.map(getRequestDoc).print()
>>>> ssc.start()
>>>>
>>>> From this code, I get:
>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>>> 1399545128000 ms.0
>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>> java.io.NotSerializableException:
>>>> org.apache.spark.streaming.StreamingContext
>>>>
>>>>
>>>> But if I do the map function inline instead of calling a separate
>>>> function, it works:
>>>>
>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>>
>>>> So why is it able to serialize my little function in regular spark, but
>>>> not in streaming?
>>>>
>>>> Thanks,
>>>> Diana
>>>>
>>>>
>>>>
>>>
>>
>  ​

Re: NotSerializableException in Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
I am very curious though. Can you post a concise code example which we can
run to reproduce this problem?

TD


On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <ta...@gmail.com>
wrote:

> I am not entire sure off the top of my head. But a possible (usually
> works) workaround is to define the function as a val instead of a def. For
> example
>
> def func(i: Int): Boolean = { true }
>
> can be written as
>
> val func = (i: Int) => { true }
>
> Hope this helps for now.
>
> TD
>
>
> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> Hey Diana,
>>
>> Did you ever figure this out?
>>
>> I’m running into the same exception, except in my case the function I’m
>> calling is a KMeans model.predict().
>>
>> In regular Spark it works, and Spark Streaming without the call to
>> model.predict() also works, but when put together I get this
>> serialization exception. I’m on 1.0.0.
>>
>> Nick
>> ​
>>
>>
>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
>> wrote:
>>
>>> Hey all, trying to set up a pretty simple streaming app and getting some
>>> weird behavior.
>>>
>>>  First, a non-streaming job that works fine:  I'm trying to pull out
>>> lines of a log file that match a regex, for which I've set up a function:
>>>
>>> def getRequestDoc(s: String):
>>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>> logs=sc.textFile(logfiles)
>>> logs.map(getRequestDoc).take(10)
>>>
>>> That works, but I want to run that on the same data, but streaming, so I
>>> tried this:
>>>
>>> val logs = ssc.socketTextStream("localhost",4444)
>>> logs.map(getRequestDoc).print()
>>> ssc.start()
>>>
>>> From this code, I get:
>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>> 1399545128000 ms.0
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException:
>>> org.apache.spark.streaming.StreamingContext
>>>
>>>
>>> But if I do the map function inline instead of calling a separate
>>> function, it works:
>>>
>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>
>>> So why is it able to serialize my little function in regular spark, but
>>> not in streaming?
>>>
>>> Thanks,
>>> Diana
>>>
>>>
>>>
>>
>

Re: NotSerializableException in Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
I am not entire sure off the top of my head. But a possible (usually works)
workaround is to define the function as a val instead of a def. For example

def func(i: Int): Boolean = { true }

can be written as

val func = (i: Int) => { true }

Hope this helps for now.

TD


On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> Hey Diana,
>
> Did you ever figure this out?
>
> I’m running into the same exception, except in my case the function I’m
> calling is a KMeans model.predict().
>
> In regular Spark it works, and Spark Streaming without the call to
> model.predict() also works, but when put together I get this
> serialization exception. I’m on 1.0.0.
>
> Nick
> ​
>
>
> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
> wrote:
>
>> Hey all, trying to set up a pretty simple streaming app and getting some
>> weird behavior.
>>
>>  First, a non-streaming job that works fine:  I'm trying to pull out
>> lines of a log file that match a regex, for which I've set up a function:
>>
>> def getRequestDoc(s: String):
>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>> logs=sc.textFile(logfiles)
>> logs.map(getRequestDoc).take(10)
>>
>> That works, but I want to run that on the same data, but streaming, so I
>> tried this:
>>
>> val logs = ssc.socketTextStream("localhost",4444)
>> logs.map(getRequestDoc).print()
>> ssc.start()
>>
>> From this code, I get:
>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>> 1399545128000 ms.0
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>>
>>
>> But if I do the map function inline instead of calling a separate
>> function, it works:
>>
>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>
>> So why is it able to serialize my little function in regular spark, but
>> not in streaming?
>>
>> Thanks,
>> Diana
>>
>>
>>
>