You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tiago Albineli Motta <ti...@gmail.com> on 2015/10/22 15:22:58 UTC

Re: Error in starting Spark Streaming Context

Can't say  what is happening, and I have a similar problem here.

While for you the source is:

org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
initialized


For me is:

org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has
not been initialized


Here, the problem started after I change my main class to use another
class to execute the stream.


Before:


object TopStream {

 //everything here

}


After


object TopStream {

   // call new TopStream.process( ... )

}


class TopStream extends Serializable {

}





Tiago Albineli Motta
Desenvolvedor de Software - Globo.com
ICQ: 32107100
http://programandosemcafeina.blogspot.com

On Wed, Jul 29, 2015 at 12:59 PM, Sadaf <sa...@platalytics.com> wrote:

> Hi
>
> I am new to Spark Streaming and writing a code for twitter connector. when
> i
> run this code more than one time, it gives the following exception. I have
> to create a new hdfs directory for checkpointing each time to make it run
> successfully and moreover it doesn't get stopped.
>
> ERROR StreamingContext: Error starting the context, marking it as stopped
>     org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
>     at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>     at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>     at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>     at scala.Option.orElse(Option.scala:257)
>     at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>     at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>     at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>     at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>     at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>     at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>     at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at
>
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>     at
>
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>     at
>
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>     at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>     at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>     at twitter.streamingSpark$.twitterConnector(App.scala:38)
>     at twitter.streamingSpark$.main(App.scala:26)
>     at twitter.streamingSpark.main(App.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>     at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> The relavent code is
>
>  def twitterConnector() :Unit =
>  {
>      val atwitter=managingCredentials()
>
>    val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> {
> managingContext() })
>    fetchTweets(ssc, atwitter )
>
>    ssc.start()             // Start the computation
>    ssc.awaitTermination()
>
>    }
>
>    def managingContext():StreamingContext =
>   {
>    //making spark context
>    val conf = new
> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>    val ssc = new StreamingContext(conf, Seconds(1))
>    val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>    import sqlContext.implicits._
>
>    //checkpointing
>    ssc.checkpoint("hdfsDirectory")
>    ssc
>    }
>     def fetchTweets (ssc : StreamingContext , atwitter :
> Option[twitter4j.auth.Authorization]) : Unit = {
>
>
>    val tweets
> =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
>    val twt = tweets.window(Seconds(10),Seconds(10))
>   //checkpoint duration
>   /twt.checkpoint(new Duration(1000))
>
>    //processing
>    case class Tweet(createdAt:Long, text:String)
>    twt.map(status=>
>    Tweet(status.getCreatedAt().getTime()/1000, status.getText())
>    )
>    twt.print()
>   }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-starting-Spark-Streaming-Context-tp24063.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Error in starting Spark Streaming Context

Posted by Tiago Albineli Motta <ti...@gmail.com>.
Solved!

The problem has nothing to do about class and object refactory. But in the
process of this refactory I made a change that is similar of your code.

Before this refactory, I processed the DStream inside the function that I
sent to StreamingContext.getOrCreate. After, I started processing the
DStream using the returned from StreamingContext.getOrCreate returned.

So you should call *fetchTweets *inside *managingContext*.

That worked for me.

Tiago





Tiago Albineli Motta
Desenvolvedor de Software - Globo.com
ICQ: 32107100
http://programandosemcafeina.blogspot.com

On Thu, Oct 22, 2015 at 11:22 AM, Tiago Albineli Motta <ti...@gmail.com>
wrote:

> Can't say  what is happening, and I have a similar problem here.
>
> While for you the source is:
>
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
>
>
> For me is:
>
> org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has not been initialized
>
>
> Here, the problem started after I change my main class to use another class to execute the stream.
>
>
> Before:
>
>
> object TopStream {
>
>  //everything here
>
> }
>
>
> After
>
>
> object TopStream {
>
>    // call new TopStream.process( ... )
>
> }
>
>
> class TopStream extends Serializable {
>
> }
>
>
>
>
>
> Tiago Albineli Motta
> Desenvolvedor de Software - Globo.com
> ICQ: 32107100
> http://programandosemcafeina.blogspot.com
>
> On Wed, Jul 29, 2015 at 12:59 PM, Sadaf <sa...@platalytics.com> wrote:
>
>> Hi
>>
>> I am new to Spark Streaming and writing a code for twitter connector.
>> when i
>> run this code more than one time, it gives the following exception. I have
>> to create a new hdfs directory for checkpointing each time to make it run
>> successfully and moreover it doesn't get stopped.
>>
>> ERROR StreamingContext: Error starting the context, marking it as stopped
>>     org.apache.spark.SparkException:
>> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
>> initialized
>>     at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>>     at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>     at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>     at scala.Option.orElse(Option.scala:257)
>>     at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>     at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>     at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>     at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>     at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>     at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>     at
>>
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>>     at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>>     at
>>
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>>     at
>>
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>>     at twitter.streamingSpark$.twitterConnector(App.scala:38)
>>     at twitter.streamingSpark$.main(App.scala:26)
>>     at twitter.streamingSpark.main(App.scala)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>     at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> The relavent code is
>>
>>  def twitterConnector() :Unit =
>>  {
>>      val atwitter=managingCredentials()
>>
>>    val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> {
>> managingContext() })
>>    fetchTweets(ssc, atwitter )
>>
>>    ssc.start()             // Start the computation
>>    ssc.awaitTermination()
>>
>>    }
>>
>>    def managingContext():StreamingContext =
>>   {
>>    //making spark context
>>    val conf = new
>> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>>    val ssc = new StreamingContext(conf, Seconds(1))
>>    val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>>    import sqlContext.implicits._
>>
>>    //checkpointing
>>    ssc.checkpoint("hdfsDirectory")
>>    ssc
>>    }
>>     def fetchTweets (ssc : StreamingContext , atwitter :
>> Option[twitter4j.auth.Authorization]) : Unit = {
>>
>>
>>    val tweets
>>
>> =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
>>    val twt = tweets.window(Seconds(10),Seconds(10))
>>   //checkpoint duration
>>   /twt.checkpoint(new Duration(1000))
>>
>>    //processing
>>    case class Tweet(createdAt:Long, text:String)
>>    twt.map(status=>
>>    Tweet(status.getCreatedAt().getTime()/1000, status.getText())
>>    )
>>    twt.print()
>>   }
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-starting-Spark-Streaming-Context-tp24063.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>