You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/03/10 16:45:41 UTC

[jira] [Resolved] (SPARK-13758) Error message is misleading when RDD refer to null spark context

     [ https://issues.apache.org/jira/browse/SPARK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-13758.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

Issue resolved by pull request 11595
[https://github.com/apache/spark/pull/11595]

> Error message is misleading when RDD refer to null spark context
> ----------------------------------------------------------------
>
>                 Key: SPARK-13758
>                 URL: https://issues.apache.org/jira/browse/SPARK-13758
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Spark Core, Streaming
>            Reporter: Mao, Wei
>            Priority: Trivial
>             Fix For: 2.0.0
>
>
> We have a recoverable Spark streaming job with checkpoint enabled, it could be executed correctly at first time, but throw following exception when restarted and recovered from checkpoint.
> {noformat}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
> 	at org.apache.spark.rdd.RDD.union(RDD.scala:565)
> 	at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
> 	at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> ...
> {noformat}
> According to exception, it shows I invoked transformations and actions in other transformations, but I did not. The real reason is that I used external RDD in DStream operation. External RDD data is not stored in checkpoint, so that during recovering, the initial value of _sc in this RDD is assigned to null and hit above exception.  But you can find the error message is misleading, it indicates nothing about the real issue
> Here is the code to reproduce it.
> {code:java}
> object Repo {
>   def createContext(ip: String, port: Int, checkpointDirectory: String):StreamingContext = {
>     println("Creating new context")
>     val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>     ssc.checkpoint(checkpointDirectory)
>     var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>     val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>     words.foreachRDD((rdd: RDD[String]) => {
>       val res = rdd.map(word => (word, word.length)).collect()
>       println("words: " + res.mkString(", "))
>       cached = cached.union(rdd)
>       cached.checkpoint()
>       println("cached words: " + cached.collect.mkString(", "))
>     })
>     ssc
>   }
>   def main(args: Array[String]) {
>     val ip = "localhost"
>     val port = 9999
>     val dir = "/home/maowei/tmp"
>     val ssc = StreamingContext.getOrCreate(dir,
>       () => {
>         createContext(ip, port, dir)
>       })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org