You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abbass Marouni (JIRA)" <ji...@apache.org> on 2015/10/30 10:46:28 UTC
[jira] [Updated] (SPARK-11422) Restarting a Spark Streaming Job
from a checkpoint fails
[ https://issues.apache.org/jira/browse/SPARK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abbass Marouni updated SPARK-11422:
-----------------------------------
Description:
A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time :
{code}
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
public class Driver2 {
public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
public static void main(String[] args) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
try {
return createContext();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
// Start our streaming context and wait for it to "finish"
jssc.start();
// Wait for the job to finish
jssc.awaitTermination();
}
public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private JavaRDD<String> ref_rdd;
public f2(JavaRDD<String> ref_rdd) {
this.ref_rdd = ref_rdd;
}
@Override
public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
return v1.union(ref_rdd);
}
}
private static JavaStreamingContext createContext() throws java.lang.Exception {
final SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName("TEST APP");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint(CHKPTDIR); // set checkpoint directory
List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
queue.add(streamRDD);
// replay the same RDD over and over
JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);
// Union the inputStream with the staticRDD
JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
unionStream.print();
return jssc;
}
}
{code}
The job fails with the following exception when stopped and restarted again :
{code}
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.union(RDD.scala:502)
at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
at fr.marouni.Driver2$f2.call(Driver2.java:61)
at fr.marouni.Driver2$f2.call(Driver2.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
at fr.marouni.Driver2.main(Driver2.java:39)
{code}
was:
A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time :
{code}
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
public class Driver2 {
public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
public static void main(String[] args) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
try {
return createContext();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
// Start our streaming context and wait for it to "finish"
jssc.start();
// Wait for the job to finish
jssc.awaitTermination();
}
public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private JavaRDD<String> ref_rdd;
public f2(JavaRDD<String> ref_rdd) {
this.ref_rdd = ref_rdd;
}
@Override
public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
return v1.union(ref_rdd);
}
}
private static JavaStreamingContext createContext() throws java.lang.Exception {
final SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName("TEST APP");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint(CHKPTDIR); // set checkpoint directory
List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
queue.add(streamRDD);
// replay the same RDD over and over
JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);
// Union the inputStream with the staticRDD
JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
unionStream.print();
return jssc;
}
}
{code}
The job fails with the following exception when stopped and restarted again :
{code}
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.union(RDD.scala:502)
at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
at fr.marouni.Driver2$f2.call(Driver2.java:61)
at fr.marouni.Driver2$f2.call(Driver2.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
at fr.marouni.Driver2.main(Driver2.java:39)
{code}
Please check the attached Java code of the job.
> Restarting a Spark Streaming Job from a checkpoint fails
> --------------------------------------------------------
>
> Key: SPARK-11422
> URL: https://issues.apache.org/jira/browse/SPARK-11422
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.3.1, 1.4.1, 1.5.1
> Reporter: Abbass Marouni
>
> A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time :
> {code}
> import java.util.Arrays;
> import java.util.List;
> import java.util.PriorityQueue;
> import java.util.Queue;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class Driver2 {
> public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
> public static void main(String[] args) {
> JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
> @Override
> public JavaStreamingContext create() {
> try {
> return createContext();
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> return null;
> }
> };
> JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
> // Start our streaming context and wait for it to "finish"
> jssc.start();
> // Wait for the job to finish
> jssc.awaitTermination();
> }
> public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> private JavaRDD<String> ref_rdd;
> public f2(JavaRDD<String> ref_rdd) {
> this.ref_rdd = ref_rdd;
> }
> @Override
> public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
> return v1.union(ref_rdd);
> }
> }
> private static JavaStreamingContext createContext() throws java.lang.Exception {
> final SparkConf conf = new SparkConf();
> conf.setMaster("local[*]");
> conf.setAppName("TEST APP");
> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> jssc.checkpoint(CHKPTDIR); // set checkpoint directory
> List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
> JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
> List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
> JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
> Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
> queue.add(streamRDD);
> // replay the same RDD over and over
> JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD);
> // Union the inputStream with the staticRDD
> JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD));
> unionStream.print();
> return jssc;
> }
> }
> {code}
> The job fails with the following exception when stopped and restarted again :
> {code}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
> at org.apache.spark.rdd.RDD.union(RDD.scala:502)
> at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
> at fr.marouni.Driver2$f2.call(Driver2.java:61)
> at fr.marouni.Driver2$f2.call(Driver2.java:1)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> at fr.marouni.Driver2.main(Driver2.java:39)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org