You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abbass Marouni (JIRA)" <ji...@apache.org> on 2015/10/30 10:46:28 UTC

[jira] [Updated] (SPARK-11422) Restarting a Spark Streaming Job from a checkpoint fails

     [ https://issues.apache.org/jira/browse/SPARK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Abbass Marouni updated SPARK-11422:
-----------------------------------
    Description: 
A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time : 
{code}
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

public class Driver2 {

    public static final String CHKPTDIR = "/tmp/spark/checkPointDir";

    public static void main(String[] args) {

        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {

            @Override
            public JavaStreamingContext create() {
                try {
                    return createContext();
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;
            }
        };

        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);

        // Start our streaming context and wait for it to "finish"
        jssc.start();

        // Wait for the job to finish
        jssc.awaitTermination();

    }

    public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        private JavaRDD<String>   ref_rdd;

        public f2(JavaRDD<String> ref_rdd) {
            this.ref_rdd = ref_rdd;
        }

        @Override
        public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
            return v1.union(ref_rdd);
        }
    }

    private static JavaStreamingContext createContext() throws java.lang.Exception {

        final SparkConf conf = new SparkConf();
        conf.setMaster("local[*]");
        conf.setAppName("TEST APP");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        jssc.checkpoint(CHKPTDIR); // set checkpoint directory

        List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
        JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);

        List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
        JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
        Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
        queue.add(streamRDD);
        // replay the same RDD over and over
        JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);

        // Union the inputStream with the staticRDD
        JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
        unionStream.print();

        return jssc;
    }

}
{code}

The job fails with the following exception when stopped and restarted again :
{code}
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
	at org.apache.spark.rdd.RDD.union(RDD.scala:502)
	at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
	at fr.marouni.Driver2$f2.call(Driver2.java:61)
	at fr.marouni.Driver2$f2.call(Driver2.java:1)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
	at scala.Option.orElse(Option.scala:257)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
	at fr.marouni.Driver2.main(Driver2.java:39)
{code}

  was:
A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time : 
{code}
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

public class Driver2 {

    public static final String CHKPTDIR = "/tmp/spark/checkPointDir";

    public static void main(String[] args) {

        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {

            @Override
            public JavaStreamingContext create() {
                try {
                    return createContext();
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;
            }
        };

        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);

        // Start our streaming context and wait for it to "finish"
        jssc.start();

        // Wait for the job to finish
        jssc.awaitTermination();

    }

    public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        private JavaRDD<String>   ref_rdd;

        public f2(JavaRDD<String> ref_rdd) {
            this.ref_rdd = ref_rdd;
        }

        @Override
        public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
            return v1.union(ref_rdd);
        }
    }

    private static JavaStreamingContext createContext() throws java.lang.Exception {

        final SparkConf conf = new SparkConf();
        conf.setMaster("local[*]");
        conf.setAppName("TEST APP");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        jssc.checkpoint(CHKPTDIR); // set checkpoint directory

        List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
        JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);

        List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
        JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
        Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
        queue.add(streamRDD);
        // replay the same RDD over and over
        JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);

        // Union the inputStream with the staticRDD
        JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
        unionStream.print();

        return jssc;
    }

}
{code}

The job fails with the following exception when stopped and restarted again :
{code}
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
	at org.apache.spark.rdd.RDD.union(RDD.scala:502)
	at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
	at fr.marouni.Driver2$f2.call(Driver2.java:61)
	at fr.marouni.Driver2$f2.call(Driver2.java:1)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
	at scala.Option.orElse(Option.scala:257)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
	at fr.marouni.Driver2.main(Driver2.java:39)
{code}

Please check the attached Java code of the job.


> Restarting a Spark Streaming Job from a checkpoint fails
> --------------------------------------------------------
>
>                 Key: SPARK-11422
>                 URL: https://issues.apache.org/jira/browse/SPARK-11422
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.3.1, 1.4.1, 1.5.1
>            Reporter: Abbass Marouni
>
> A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time : 
> {code}
> import java.util.Arrays;
> import java.util.List;
> import java.util.PriorityQueue;
> import java.util.Queue;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class Driver2 {
>     public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
>     public static void main(String[] args) {
>         JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
>             @Override
>             public JavaStreamingContext create() {
>                 try {
>                     return createContext();
>                 } catch (Exception e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>                 return null;
>             }
>         };
>         JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
>         // Start our streaming context and wait for it to "finish"
>         jssc.start();
>         // Wait for the job to finish
>         jssc.awaitTermination();
>     }
>     public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         private JavaRDD<String>   ref_rdd;
>         public f2(JavaRDD<String> ref_rdd) {
>             this.ref_rdd = ref_rdd;
>         }
>         @Override
>         public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
>             return v1.union(ref_rdd);
>         }
>     }
>     private static JavaStreamingContext createContext() throws java.lang.Exception {
>         final SparkConf conf = new SparkConf();
>         conf.setMaster("local[*]");
>         conf.setAppName("TEST APP");
>         JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
>         jssc.checkpoint(CHKPTDIR); // set checkpoint directory
>         List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
>         JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
>         List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
>         JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
>         Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
>         queue.add(streamRDD);
>         // replay the same RDD over and over
>         JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);
>         // Union the inputStream with the staticRDD
>         JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
>         unionStream.print();
>         return jssc;
>     }
> }
> {code}
> The job fails with the following exception when stopped and restarted again :
> {code}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
> 	at org.apache.spark.rdd.RDD.union(RDD.scala:502)
> 	at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
> 	at fr.marouni.Driver2$f2.call(Driver2.java:61)
> 	at fr.marouni.Driver2$f2.call(Driver2.java:1)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> 	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> 	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> 	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> 	at fr.marouni.Driver2.main(Driver2.java:39)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org