You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (JIRA)" <ji...@apache.org> on 2014/12/17 04:15:13 UTC
[jira] [Created] (SPARK-4868) Twitter DStream.map() throws "Task
not serializable"
Nicholas Chammas created SPARK-4868:
---------------------------------------
Summary: Twitter DStream.map() throws "Task not serializable"
Key: SPARK-4868
URL: https://issues.apache.org/jira/browse/SPARK-4868
Project: Spark
Issue Type: Bug
Components: Spark Shell, Streaming
Affects Versions: 1.1.1
Environment: * Spark 1.1.1
* EC2 cluster with 1 slave spun up using {{spark-ec2}}
* twitter4j 3.0.3
* {{spark-shell}} called with {{--jars}} argument to load {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
Reporter: Nicholas Chammas
Priority: Minor
_(Continuing the discussion [started here on the Spark user list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
The following Spark Streaming code throws a serialization exception I do not understand.
{code}
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
def getAuth(): Option[Authorization] = {
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", "accessToken")
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
}
def noop(a: Any): Any = {
a
}
val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)
liveTweets.map(t => noop(t)).print() // exception here
ssc.start()
{code}
So before I even start the StreamingContext, I get the following stack trace:
{code}
scala> liveTweets.map(t => noop(t)).print()
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
at $iwC$$iwC$$iwC.<init>(<console>:32)
at $iwC$$iwC.<init>(<console>:34)
at $iwC.<init>(<console>:36)
at <init>(<console>:38)
at .<init>(<console>:42)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 43 more
{code}
What I'm really trying to do is use Spark Streaming via the interactive shell to filter Tweets using a trained KMeans model. I got errors trying that, and I boiled it down to this repro.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org