You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Renyi Xiong <re...@gmail.com> on 2015/12/09 21:45:45 UTC

DStream not initialized SparkException

hi,

I met following exception when the driver program tried to recover from
checkpoint, looks like the logic relies on zeroTime being set which doesn't
seem to happen here. am I missing anything or is it a bug in 1.4.1?

org.apache.spark.SparkException:
org.apache.spark.streaming.api.csharp.CSharpTransformed2DStream@161f0d27
has not been initialized
        at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
        at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
        at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
        at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
        at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:145)
        at
org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:90)
        at
org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:25)
        at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:724)

Re: DStream not initialized SparkException

Posted by Renyi Xiong <re...@gmail.com>.
never mind, one of my peers correct the driver program for me - all dstream
operations need to be within the scope of getOrCreate API

On Wed, Dec 9, 2015 at 3:32 PM, Renyi Xiong <re...@gmail.com> wrote:

> following scala program throws same exception, I know people are running
> streaming jobs against kafka, I must be missing something. any idea why?
>
> package org.apache.spark.streaming.api.csharp
>
> import java.util.HashMap
>
> import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
>
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.SparkConf
>
> object ScalaSML {
>   def main(args: Array[String]) {
>
>  val checkpointPath =
> "hdfs://SparkMasterVIP.AdsOISCP-Sandbox-Ch1d.CH1D.ap.gbl/checkpoint/ScalaSML/HK2"
>     val sparkConf = new SparkConf().setAppName("ScalaSML")
>     val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
>   val context = new StreamingContext(sparkConf, Seconds(60))
>   context.checkpoint(checkpointPath)
>   context
>   })
>
>  val kafkaParams = Map("metadata.broker.list" ->  "...",
>       "auto.offset.reset" -> "largest")
>
>  val topics = Set("topic")
>
>     val ds = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics)
>  ds.foreachRDD((rdd, time) => println("Time: " + time + " Count: " +
> rdd.count()))
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> 15/12/09 15:22:43 ERROR StreamingContext: Error starting the context,
> marking it as stopped
> org.apache.spark.SparkException:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@15ce2c0 has not
> been initialized
>         at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>         at
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:83)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>         at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:593)
>         at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:591)
>         at
> org.apache.spark.streaming.api.csharp.ScalaSML$.main(ScalaSML.scala:48)
>         at
> org.apache.spark.streaming.api.csharp.ScalaSML.main(ScalaSML.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> On Wed, Dec 9, 2015 at 12:45 PM, Renyi Xiong <re...@gmail.com>
> wrote:
>
>> hi,
>>
>> I met following exception when the driver program tried to recover from
>> checkpoint, looks like the logic relies on zeroTime being set which doesn't
>> seem to happen here. am I missing anything or is it a bug in 1.4.1?
>>
>> org.apache.spark.SparkException:
>> org.apache.spark.streaming.api.csharp.CSharpTransformed2DStream@161f0d27
>> has not been initialized
>>         at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>         at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>>         at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
>>         at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at
>> org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:145)
>>         at
>> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:90)
>>         at
>> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:25)
>>         at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>         at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>         at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>         at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>         at
>> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>>         at java.lang.Thread.run(Thread.java:724)
>>
>
>

Re: DStream not initialized SparkException

Posted by Renyi Xiong <re...@gmail.com>.
following scala program throws same exception, I know people are running
streaming jobs against kafka, I must be missing something. any idea why?

package org.apache.spark.streaming.api.csharp

import java.util.HashMap

import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object ScalaSML {
  def main(args: Array[String]) {

 val checkpointPath =
"hdfs://SparkMasterVIP.AdsOISCP-Sandbox-Ch1d.CH1D.ap.gbl/checkpoint/ScalaSML/HK2"
    val sparkConf = new SparkConf().setAppName("ScalaSML")
    val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
  val context = new StreamingContext(sparkConf, Seconds(60))
  context.checkpoint(checkpointPath)
  context
  })

 val kafkaParams = Map("metadata.broker.list" ->  "...",
      "auto.offset.reset" -> "largest")

 val topics = Set("topic")

    val ds = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics)
 ds.foreachRDD((rdd, time) => println("Time: " + time + " Count: " +
rdd.count()))

    ssc.start()
    ssc.awaitTermination()
  }
}

15/12/09 15:22:43 ERROR StreamingContext: Error starting the context,
marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@15ce2c0 has not
been initialized
        at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
        at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:83)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
        at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
        at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
        at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:593)
        at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:591)
        at
org.apache.spark.streaming.api.csharp.ScalaSML$.main(ScalaSML.scala:48)
        at
org.apache.spark.streaming.api.csharp.ScalaSML.main(ScalaSML.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

On Wed, Dec 9, 2015 at 12:45 PM, Renyi Xiong <re...@gmail.com> wrote:

> hi,
>
> I met following exception when the driver program tried to recover from
> checkpoint, looks like the logic relies on zeroTime being set which doesn't
> seem to happen here. am I missing anything or is it a bug in 1.4.1?
>
> org.apache.spark.SparkException:
> org.apache.spark.streaming.api.csharp.CSharpTransformed2DStream@161f0d27
> has not been initialized
>         at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>         at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
>         at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:145)
>         at
> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:90)
>         at
> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:25)
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>         at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         at java.lang.Thread.run(Thread.java:724)
>