You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Riccardo Vincelli (JIRA)" <ji...@apache.org> on 2018/01/04 09:01:00 UTC

[jira] [Commented] (SPARK-13316) "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards

    [ https://issues.apache.org/jira/browse/SPARK-13316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311036#comment-16311036 ] 

Riccardo Vincelli commented on SPARK-13316:
-------------------------------------------

Actually I am not completely sure this is cleared out.

I am on {{spark-streaming-kafka-0-10}} with {{Spark 2.1.0 }} implementing a very complex streaming pipeline. This pipeline lives in a number of abstractions outside the {{getOrCreate()}}. When started, it will fail to write the checkpoint back with an NPE (here)[https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L126], like the {{DStream}}'s had no context or something - even if the context is correctly created and activated.

Breaking all compositions down and moving all the code in this utility method seems to solve the issue, at least with part of the pipeline. Alternatives like modularizing with objects imported in this method do not work either, this time with an explicit initialization exception in the streams.

Far from being a Spark expert but my poor man's advice is to start to write your code inside there if checkpoint recovery is needed.

To the Spark team: if it is really so, this is very unclear in the documentation! It should be stated as mandatory, please.

So the question is: is it actually so, that one has to write the code inside the context creation handle, or are we missing something? Please enlight us :)

Sorry for the long post but for the sake of clarity, and googleness, such discussions belong to the issue, not to the mailing list or SO.

Thank you all,

> "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13316
>                 URL: https://issues.apache.org/jira/browse/SPARK-13316
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>            Reporter: Jacek Laskowski
>            Priority: Minor
>
> I faced the issue today but [it was already reported on SO|http://stackoverflow.com/q/35090180/1305344] a couple of days ago and the reason is that a dstream is registered after a StreamingContext has been recreated from checkpoint.
> It _appears_ that...no dstreams must be registered after a StreamingContext has been recreated from checkpoint. It is *not* obvious at first.
> The code:
> {code}
> def createStreamingContext(): StreamingContext = {
>     val ssc = new StreamingContext(sparkConf, Duration(1000))
>     ssc.checkpoint(checkpointDir)
>     ssc
> }
> val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
> val socketStream = ssc.socketTextStream(...)
> socketStream.checkpoint(Seconds(1))
> socketStream.foreachRDD(...)
> {code}
> It should be described in docs at the very least and/or checked in the code when the streaming computation starts.
> The exception is as follows:
> {code}
> org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been initialized
>   at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
>   at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at scala.Option.orElse(Option.scala:289)
>   at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
>   at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
>   at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
>   at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
>   at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>   at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
>   ... 43 elided
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org