You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2014/12/16 03:24:09 UTC
Re: NotSerializableException in Spark Streaming
This still seems to be broken. In 1.1.1, it errors immediately on this line
(from the above repro script):
liveTweets.map(t => noop(t)).print()
The stack trace is:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
at $iwC$$iwC$$iwC.<init>(<console>:32)
at $iwC$$iwC.<init>(<console>:34)
at $iwC.<init>(<console>:36)
at <init>(<console>:38)
at .<init>(<console>:42)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 43 more
TD, any troubleshooting tips?
Nick
On Thu Jul 24 2014 at 8:55:07 PM Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:
> Yep, here goes!
>
> Here are my environment vitals:
>
> - Spark 1.0.0
> - EC2 cluster with 1 slave spun up using spark-ec2
> - twitter4j 3.0.3
> - spark-shell called with --jars argument to load
> spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
> jars.
>
> Now, while I’m in the Spark shell, I enter the following:
>
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>
> System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
> System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
> System.setProperty("twitter4j.oauth.accessToken", "accessToken")
> System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>
> Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
>
> }
> def noop(a: Any): Any = {
> a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
>
> liveTweets.map(t => noop(t)).print()
>
> ssc.start()
>
> So basically, I’m just printing Tweets as-is, but first I’m mapping them
> to themselves via noop(). The Tweets will start to flow just fine for a
> minute or so, and then, this:
>
> 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 1406243610000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
> at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
> at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
> at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The time-to-first-error is variable.
>
> This is the simplest repro I can show at this time. Doing more complex
> things with liveTweets that involve a KMeansModel, for example, will be
> interrupted quicker by this java.io.NotSerializableException. I don’t
> know if the root cause is the same, but the error certainly is.
>
> By the way, trying to reproduce this on 1.0.1 doesn’t raise the same
> error, but I can’t dig deeper to make sure this is really resolved (e.g. by
> trying more complex things that need data) due to SPARK-2471
> <https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue
> has been resolved, so I’ll try this whole process again using the latest
> from master and see how it goes.
>
> Nick
>
>
> On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> I am very curious though. Can you post a concise code example which we can
>> run to reproduce this problem?
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> I am not entire sure off the top of my head. But a possible (usually
>>> works) workaround is to define the function as a val instead of a def. For
>>> example
>>>
>>> def func(i: Int): Boolean = { true }
>>>
>>> can be written as
>>>
>>> val func = (i: Int) => { true }
>>>
>>> Hope this helps for now.
>>>
>>> TD
>>>
>>>
>>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> Hey Diana,
>>>>
>>>> Did you ever figure this out?
>>>>
>>>> I’m running into the same exception, except in my case the function I’m
>>>> calling is a KMeans model.predict().
>>>>
>>>> In regular Spark it works, and Spark Streaming without the call to
>>>> model.predict() also works, but when put together I get this
>>>> serialization exception. I’m on 1.0.0.
>>>>
>>>> Nick
>>>>
>>>>
>>>>
>>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dc...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Hey all, trying to set up a pretty simple streaming app and getting
>>>>> some weird behavior.
>>>>>
>>>>> First, a non-streaming job that works fine: I'm trying to pull out
>>>>> lines of a log file that match a regex, for which I've set up a function:
>>>>>
>>>>> def getRequestDoc(s: String):
>>>>> String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>>>> logs=sc.textFile(logfiles)
>>>>> logs.map(getRequestDoc).take(10)
>>>>>
>>>>> That works, but I want to run that on the same data, but streaming, so
>>>>> I tried this:
>>>>>
>>>>> val logs = ssc.socketTextStream("localhost",4444)
>>>>> logs.map(getRequestDoc).print()
>>>>> ssc.start()
>>>>>
>>>>> From this code, I get:
>>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>>>> 1399545128000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>>> java.io.NotSerializableException:
>>>>> org.apache.spark.streaming.StreamingContext
>>>>>
>>>>>
>>>>> But if I do the map function inline instead of calling a separate
>>>>> function, it works:
>>>>>
>>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>>>
>>>>> So why is it able to serialize my little function in regular spark,
>>>>> but not in streaming?
>>>>>
>>>>> Thanks,
>>>>> Diana
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>