You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/01/25 15:27:26 UTC

[jira] [Resolved] (SPARK-13316) "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards

     [ https://issues.apache.org/jira/browse/SPARK-13316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-13316.
----------------------------------
    Resolution: Not A Problem

I tried to reproduce this as below:

{code}
nc -lk 9999
{code}

{code}
val checkpointDir = Utils.createTempDir().toString
def createStreamingContext(): StreamingContext = {
  val ssc = new StreamingContext(conf, Duration(1000))
  ssc.checkpoint(checkpointDir)
  ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)

val socketStream = ssc.socketTextStream("localhost", 9999)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(rdd => rdd.collect().foreach(println))
ssc.start()
ssc.awaitTermination()
{code}

Appreantly, it seems fixed in https://github.com/apache/spark/commit/4a5558ca9921ce89b3996e9ead13b07123fc7a2d without a JIRA. 

I am resolving this.

> "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13316
>                 URL: https://issues.apache.org/jira/browse/SPARK-13316
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>            Reporter: Jacek Laskowski
>            Priority: Minor
>
> I faced the issue today but [it was already reported on SO|http://stackoverflow.com/q/35090180/1305344] a couple of days ago and the reason is that a dstream is registered after a StreamingContext has been recreated from checkpoint.
> It _appears_ that...no dstreams must be registered after a StreamingContext has been recreated from checkpoint. It is *not* obvious at first.
> The code:
> {code}
> def createStreamingContext(): StreamingContext = {
>     val ssc = new StreamingContext(sparkConf, Duration(1000))
>     ssc.checkpoint(checkpointDir)
>     ssc
> }
> val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
> val socketStream = ssc.socketTextStream(...)
> socketStream.checkpoint(Seconds(1))
> socketStream.foreachRDD(...)
> {code}
> It should be described in docs at the very least and/or checked in the code when the streaming computation starts.
> The exception is as follows:
> {code}
> org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been initialized
>   at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
>   at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at scala.Option.orElse(Option.scala:289)
>   at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
>   at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
>   at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
>   at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
>   at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>   at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
>   ... 43 elided
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org