You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (JIRA)" <ji...@apache.org> on 2014/12/17 04:15:13 UTC

[jira] [Created] (SPARK-4868) Twitter DStream.map() throws "Task not serializable"

Nicholas Chammas created SPARK-4868:
---------------------------------------

             Summary: Twitter DStream.map() throws "Task not serializable"
                 Key: SPARK-4868
                 URL: https://issues.apache.org/jira/browse/SPARK-4868
             Project: Spark
          Issue Type: Bug
          Components: Spark Shell, Streaming
    Affects Versions: 1.1.1
         Environment: * Spark 1.1.1
* EC2 cluster with 1 slave spun up using {{spark-ec2}}
* twitter4j 3.0.3
* {{spark-shell}} called with {{--jars}} argument to load {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
            Reporter: Nicholas Chammas
            Priority: Minor


_(Continuing the discussion [started here on the Spark user list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_

The following Spark Streaming code throws a serialization exception I do not understand.

{code}
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder 
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils

def getAuth(): Option[Authorization] = {
  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
  System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

  Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
} 

def noop(a: Any): Any = {
  a
}

val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)

liveTweets.map(t => noop(t)).print()  // exception here

ssc.start()
{code}

So before I even start the StreamingContext, I get the following stack trace:

{code}
scala> liveTweets.map(t => noop(t)).print()
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
	at $iwC$$iwC$$iwC.<init>(<console>:32)
	at $iwC$$iwC.<init>(<console>:34)
	at $iwC.<init>(<console>:36)
	at <init>(<console>:38)
	at .<init>(<console>:42)
	at .<clinit>(<console>)
	at .<init>(<console>:7)
	at .<clinit>(<console>)
	at $print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
	at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
	... 43 more
{code}

What I'm really trying to do is use Spark Streaming via the interactive shell to filter Tweets using a trained KMeans model. I got errors trying that, and I boiled it down to this repro.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org