You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 05:37:41 UTC

[jira] [Resolved] (SPARK-4868) Twitter DStream.map() throws "Task not serializable"

     [ https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-4868.
---------------------------------
    Resolution: Incomplete

> Twitter DStream.map() throws "Task not serializable"
> ----------------------------------------------------
>
>                 Key: SPARK-4868
>                 URL: https://issues.apache.org/jira/browse/SPARK-4868
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Spark Shell
>    Affects Versions: 1.1.1, 1.2.0
>         Environment: * Spark 1.1.1 or 1.2.0
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
>            Reporter: Nicholas Chammas
>            Priority: Minor
>              Labels: bulk-closed
>
> _(Continuing the discussion [started here on the Spark user list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> 	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
> 	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
> 	at $iwC$$iwC$$iwC.<init>(<console>:32)
> 	at $iwC$$iwC.<init>(<console>:34)
> 	at $iwC.<init>(<console>:36)
> 	at <init>(<console>:38)
> 	at .<init>(<console>:42)
> 	at .<clinit>(<console>)
> 	at .<init>(<console>:7)
> 	at .<clinit>(<console>)
> 	at $print(<console>)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
> 	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
> 	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
> 	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
> 	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
> 	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
> 	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
> 	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
> 	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
> 	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
> 	at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
> 	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> 	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
> 	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
> 	at org.apache.spark.repl.Main$.main(Main.scala:31)
> 	at org.apache.spark.repl.Main.main(Main.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> 	... 43 more
> {code}
> What I'm really trying to do is use Spark Streaming via the interactive shell to filter Tweets using a trained KMeans model. I got errors trying that, and I boiled it down to this repro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org