You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2015/08/26 13:30:35 UTC

JobScheduler: Error generating jobs for time for custom InputDStream

Hi,

I've developed a ScalaCheck property for testing Spark Streaming
transformations. To do that I had to develop a custom InputDStream, which
is very similar to QueueInputDStream but has a method for adding new test
cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
You can see the code at
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
I have developed a few properties that run in local mode
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
The problem is that when the batch interval is too small, and the machine
cannot complete the batches fast enough, I get the following exceptions in
the Spark log

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922500 ms
java.lang.NullPointerException
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
    at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
    at scala.util.Try$.apply(Try.scala:161)
    at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
    at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922600 ms

Sometimes test cases finish correctly anyway when this happens, but I'm a
bit concerned and wanted to check that my custom InputDStream is ok. In a
previous topic
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-Resolved-td2066.html
the suggested solution was to return Some of an empty RDD on compute() when
the batch is empty. But that solution doesn't work for me because when I do
 that then batches are mixed up (sometimes two consecutive batches are
fused in a single batch, leaving empty one of the batches), so the
integrity of the test case generated by ScalaCheck is not preserved.
Besides, QueueuInputDStream returns None when there is no batch. I would
like to understand why Option[RDD[T]] is the returning type of
DStream.compute(), and check with the list if my custom InputDStream is ok

Thanks a lot for your help.

Greetings,

Juan

Re: JobScheduler: Error generating jobs for time for custom InputDStream

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Shixiong,

Thanks for your answer. I will take a lot to your suggestion, maybe my call
to SparkContext.parallelize doesn't work well when there are less records
to parallelize than partitions.

Thanks a lot for your help

Greetings,

Juan

2015-09-24 2:04 GMT-07:00 Shixiong Zhu <zs...@gmail.com>:

> Looks like you returns a "Some(null)" in "compute". If you don't want to
> create a RDD, it should return None. If you want to return an empty RDD, it
> should return "Some(sc.emptyRDD)".
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com>:
>
>> Hi,
>>
>> I sent this message to the user list a few weeks ago with no luck, so I'm
>> forwarding it to the dev list in case someone could give a hand with this.
>> Thanks a lot in advance
>>
>>
>> I've developed a ScalaCheck property for testing Spark Streaming
>> transformations. To do that I had to develop a custom InputDStream, which
>> is very similar to QueueInputDStream but has a method for adding new test
>> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
>> You can see the code at
>> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
>> I have developed a few properties that run in local mode
>> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
>> The problem is that when the batch interval is too small, and the machine
>> cannot complete the batches fast enough, I get the following exceptions in
>> the Spark log
>>
>> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
>> 1440580922500 ms
>> java.lang.NullPointerException
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
>>     at
>> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>     at scala.Option.orElse(Option.scala:257)
>>     at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>     at
>> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>     at scala.Option.orElse(Option.scala:257)
>>     at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>     at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>     at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>     at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>     at scala.Option.orElse(Option.scala:257)
>>     at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>     at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>     at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>     at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>     at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>     at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>     at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>     at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
>>     at scala.util.Try$.apply(Try.scala:161)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
>>     at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
>> 1440580922600 ms
>>
>> Sometimes test cases finish correctly anyway when this happens, but I'm a
>> bit concerned and wanted to check that my custom InputDStream is ok. In a
>> previous topic
>> http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-Resolved-td2066.html
>> the suggested solution was to return Some of an empty RDD on compute() when
>> the batch is empty. But that solution doesn't work for me because when I do
>>  that then batches are mixed up (sometimes two consecutive batches are
>> fused in a single batch, leaving empty one of the batches), so the
>> integrity of the test case generated by ScalaCheck is not preserved.
>> Besides, QueueuInputDStream returns None when there is no batch. I would
>> like to understand why Option[RDD[T]] is the returning type of
>> DStream.compute(), and check with the list if my custom InputDStream is ok
>>
>> Thanks a lot for your help.
>>
>> Greetings,
>>
>> Juan
>>
>>
>>
>>
>>
>

Re: JobScheduler: Error generating jobs for time for custom InputDStream

Posted by Shixiong Zhu <zs...@gmail.com>.
Looks like you returns a "Some(null)" in "compute". If you don't want to
create a RDD, it should return None. If you want to return an empty RDD, it
should return "Some(sc.emptyRDD)".

Best Regards,
Shixiong Zhu

2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com>:

> Hi,
>
> I sent this message to the user list a few weeks ago with no luck, so I'm
> forwarding it to the dev list in case someone could give a hand with this.
> Thanks a lot in advance
>
>
> I've developed a ScalaCheck property for testing Spark Streaming
> transformations. To do that I had to develop a custom InputDStream, which
> is very similar to QueueInputDStream but has a method for adding new test
> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
> You can see the code at
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
> I have developed a few properties that run in local mode
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
> The problem is that when the batch interval is too small, and the machine
> cannot complete the batches fast enough, I get the following exceptions in
> the Spark log
>
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922500 ms
> java.lang.NullPointerException
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
>     at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>     at scala.Option.orElse(Option.scala:257)
>     at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>     at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>     at scala.Option.orElse(Option.scala:257)
>     at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>     at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>     at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>     at scala.Option.orElse(Option.scala:257)
>     at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>     at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>     at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>     at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>     at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
>     at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
>     at scala.util.Try$.apply(Try.scala:161)
>     at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
>     at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
>     at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>     at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922600 ms
>
> Sometimes test cases finish correctly anyway when this happens, but I'm a
> bit concerned and wanted to check that my custom InputDStream is ok. In a
> previous topic
> http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-Resolved-td2066.html
> the suggested solution was to return Some of an empty RDD on compute() when
> the batch is empty. But that solution doesn't work for me because when I do
>  that then batches are mixed up (sometimes two consecutive batches are
> fused in a single batch, leaving empty one of the batches), so the
> integrity of the test case generated by ScalaCheck is not preserved.
> Besides, QueueuInputDStream returns None when there is no batch. I would
> like to understand why Option[RDD[T]] is the returning type of
> DStream.compute(), and check with the list if my custom InputDStream is ok
>
> Thanks a lot for your help.
>
> Greetings,
>
> Juan
>
>
>
>
>

Fwd: JobScheduler: Error generating jobs for time for custom InputDStream

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi,

I sent this message to the user list a few weeks ago with no luck, so I'm
forwarding it to the dev list in case someone could give a hand with this.
Thanks a lot in advance

I've developed a ScalaCheck property for testing Spark Streaming
transformations. To do that I had to develop a custom InputDStream, which
is very similar to QueueInputDStream but has a method for adding new test
cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
You can see the code at
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
I have developed a few properties that run in local mode
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
The problem is that when the batch interval is too small, and the machine
cannot complete the batches fast enough, I get the following exceptions in
the Spark log

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922500 ms
java.lang.NullPointerException
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
    at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
    at scala.util.Try$.apply(Try.scala:161)
    at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
    at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
    at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922600 ms

Sometimes test cases finish correctly anyway when this happens, but I'm a
bit concerned and wanted to check that my custom InputDStream is ok. In a
previous topic
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-Resolved-td2066.html
the suggested solution was to return Some of an empty RDD on compute() when
the batch is empty. But that solution doesn't work for me because when I do
 that then batches are mixed up (sometimes two consecutive batches are
fused in a single batch, leaving empty one of the batches), so the
integrity of the test case generated by ScalaCheck is not preserved.
Besides, QueueuInputDStream returns None when there is no batch. I would
like to understand why Option[RDD[T]] is the returning type of
DStream.compute(), and check with the list if my custom InputDStream is ok

Thanks a lot for your help.

Greetings,

Juan