You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Prada Souvanlasy (JIRA)" <ji...@apache.org> on 2015/10/30 16:08:27 UTC
[jira] [Commented] (SPARK-11422) Spark Streaming job which does a
transformation involving a DStream and a RDD cannot be restarted from a
checkpoint
[ https://issues.apache.org/jira/browse/SPARK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14982672#comment-14982672 ]
Prada Souvanlasy commented on SPARK-11422:
------------------------------------------
Hello, thanks for your quick answer.
Therefore, we think that doing transformations involving data coming from a stream and a 'static' dataset (such as a join to enrich the incoming data stream with relevant information from the static dataset) is a common use-case we will have to handle.
We might have done it the wrong way regarding checkpointing, so what would you recommend to properly address this use-case ?
IMHO, an example on the documentation showing how to deal with this kind of use-cases with Spark Streaming (while enabling checkpointing) would be highly beneficial to a lot of users.
> Spark Streaming job which does a transformation involving a DStream and a RDD cannot be restarted from a checkpoint
> -------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-11422
> URL: https://issues.apache.org/jira/browse/SPARK-11422
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.3.1, 1.4.1, 1.5.1
> Reporter: Abbass Marouni
>
> A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time :
> {code}
> import java.util.Arrays;
> import java.util.List;
> import java.util.PriorityQueue;
> import java.util.Queue;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class Driver2 {
> public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
> public static void main(String[] args) {
> JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
> @Override
> public JavaStreamingContext create() {
> try {
> return createContext();
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> return null;
> }
> };
> JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
> // Start our streaming context and wait for it to "finish"
> jssc.start();
> // Wait for the job to finish
> jssc.awaitTermination();
> }
> public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> private JavaRDD<String> ref_rdd;
> public f2(JavaRDD<String> ref_rdd) {
> this.ref_rdd = ref_rdd;
> }
> @Override
> public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
> return v1.union(ref_rdd);
> }
> }
> private static JavaStreamingContext createContext() throws java.lang.Exception {
> final SparkConf conf = new SparkConf();
> conf.setMaster("local[*]");
> conf.setAppName("TEST APP");
> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> jssc.checkpoint(CHKPTDIR); // set checkpoint directory
> List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
> JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
> List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
> JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
> Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
> queue.add(streamRDD);
> // replay the same RDD over and over
> JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);
> // Union the inputStream with the staticRDD
> JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
> unionStream.print();
> return jssc;
> }
> }
> {code}
> The job fails with the following exception when stopped and restarted again :
> {code}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
> at org.apache.spark.rdd.RDD.union(RDD.scala:502)
> at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
> at fr.marouni.Driver2$f2.call(Driver2.java:61)
> at fr.marouni.Driver2$f2.call(Driver2.java:1)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> at fr.marouni.Driver2.main(Driver2.java:39)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org