You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2014/12/16 03:24:09 UTC

Re: NotSerializableException in Spark Streaming

This still seems to be broken. In 1.1.1, it errors immediately on this line
(from the above repro script):

liveTweets.map(t => noop(t)).print()

The stack trace is:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
    at $iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC.<init>(<console>:34)
    at $iwC.<init>(<console>:36)
    at <init>(<console>:38)
    at .<init>(<console>:42)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 43 more

TD, any troubleshooting tips?

Nick
​

On Thu Jul 24 2014 at 8:55:07 PM Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> Yep, here goes!
>
> Here are my environment vitals:
>
>    - Spark 1.0.0
>    - EC2 cluster with 1 slave spun up using spark-ec2
>    - twitter4j 3.0.3
>    - spark-shell called with --jars argument to load
>    spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
>    jars.
>
> Now, while I’m in the Spark shell, I enter the following:
>
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken")
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
>
> }
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
>
> liveTweets.map(t => noop(t)).print()
>
> ssc.start()
>
> So basically, I’m just printing Tweets as-is, but first I’m mapping them
> to themselves via noop(). The Tweets will start to flow just fine for a
> minute or so, and then, this:
>
> 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 1406243610000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>     at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The time-to-first-error is variable.
>
> This is the simplest repro I can show at this time. Doing more complex
> things with liveTweets that involve a KMeansModel, for example, will be
> interrupted quicker by this java.io.NotSerializableException. I don’t
> know if the root cause is the same, but the error certainly is.
>
> By the way, trying to reproduce this on 1.0.1 doesn’t raise the same
> error, but I can’t dig deeper to make sure this is really resolved (e.g. by
> trying more complex things that need data) due to SPARK-2471
> <https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue
> has been resolved, so I’ll try this whole process again using the latest
> from master and see how it goes.
>
> Nick
>
>
> On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> I am very curious though. Can you post a concise code example which we can
>> run to reproduce this problem?
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> I am not entire sure off the top of my head. But a possible (usually
>>> works) workaround is to define the function as a val instead of a def. For
>>> example
>>>
>>> def func(i: Int): Boolean = { true }
>>>
>>> can be written as
>>>
>>> val func = (i: Int) => { true }
>>>
>>> Hope this helps for now.
>>>
>>> TD
>>>
>>>
>>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> Hey Diana,
>>>>
>>>> Did you ever figure this out?
>>>>
>>>> I’m running into the same exception, except in my case the function I’m
>>>> calling is a KMeans model.predict().
>>>>
>>>> In regular Spark it works, and Spark Streaming without the call to
>>>> model.predict() also works, but when put together I get this
>>>> serialization exception. I’m on 1.0.0.
>>>>
>>>> Nick
>>>> ​
>>>>
>>>>
>>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Hey all, trying to set up a pretty simple streaming app and getting
>>>>> some weird behavior.
>>>>>
>>>>>  First, a non-streaming job that works fine:  I'm trying to pull out
>>>>> lines of a log file that match a regex, for which I've set up a function:
>>>>>
>>>>> def getRequestDoc(s: String):
>>>>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>>>> logs=sc.textFile(logfiles)
>>>>> logs.map(getRequestDoc).take(10)
>>>>>
>>>>> That works, but I want to run that on the same data, but streaming, so
>>>>> I tried this:
>>>>>
>>>>> val logs = ssc.socketTextStream("localhost",4444)
>>>>> logs.map(getRequestDoc).print()
>>>>> ssc.start()
>>>>>
>>>>> From this code, I get:
>>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>>>> 1399545128000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>>> java.io.NotSerializableException:
>>>>> org.apache.spark.streaming.StreamingContext
>>>>>
>>>>>
>>>>> But if I do the map function inline instead of calling a separate
>>>>> function, it works:
>>>>>
>>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>>>
>>>>> So why is it able to serialize my little function in regular spark,
>>>>> but not in streaming?
>>>>>
>>>>> Thanks,
>>>>> Diana
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>  ​
>