You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2015/06/23 06:33:01 UTC

[jira] [Comment Edited] (SPARK-8553) Resuming Checkpointed QueueStream Fails

    [ https://issues.apache.org/jira/browse/SPARK-8553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14597147#comment-14597147 ] 

Shixiong Zhu edited comment on SPARK-8553 at 6/23/15 4:32 AM:
--------------------------------------------------------------

`queueStream` doesn't support the checkpoint feature because Streaming cannot recovery RDDs.

[~tdas] I think we should add docs for `queueStream` to explain the checkpoint issue.


was (Author: zsxwing):
`queueStream` doesn't support the checkpoint feature because Streaming cannot recovery RDDs.

[~tdas] I think we should add docs for `queueStream`

> Resuming Checkpointed QueueStream Fails
> ---------------------------------------
>
>                 Key: SPARK-8553
>                 URL: https://issues.apache.org/jira/browse/SPARK-8553
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>    Affects Versions: 1.4.0
>            Reporter: Shaanan Cohney
>
> After using a QueueStream within a checkpointed StreamingContext, when the context is resumed the following error is triggered:
> {code}
> 15/06/23 02:33:09 WARN QueueInputDStream: isTimeValid called with 1434987594000 ms where as last valid time is 1434987678000 ms
> 15/06/23 02:33:09 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.persist(RDD.scala:162)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:357)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:354)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:354)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> 	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:195)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> 	at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:242)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> 	at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:241)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> 	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> 	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> 	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> 	at py4j.Gateway.invoke(Gateway.java:259)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:207)
> 	at java.lang.Thread.run(Thread.java:745)
> Traceback (most recent call last):
>   File "factor.py", line 74, in <module>
>     main()
>   File "factor.py", line 60, in main
>     purged_filename = sieving.run(sc, parameters, poly_filename=poly_filename)
>   File "/home/ubuntu/spark_apps/sieving.py", line 403, in run
>     (rels, outfiles) = run_sieving(sc, parameters, poly, poly_filename, fb_paths, rels_found, rels_wanted)
>   File "/home/ubuntu/spark_apps/sieving.py", line 250, in run_sieving
>     ssc.start()
>   File "/home/ubuntu/spark/python/pyspark/streaming/context.py", line 185, in start
>     self._jssc.start()
>   File "/usr/local/lib/python3.4/dist-packages/py4j/java_gateway.py", line 538, in __call__
>     self.target_id, self.name)
>   File "/usr/local/lib/python3.4/dist-packages/py4j/protocol.py", line 300, in 
> {code}
> The code triggering the error is Python3 running on  Spark Standalone:
> {code}
> ssc = StreamingContext.getOrCreate(s3n_path, make_ssc)
> ....
> p_batches = [ssc.sparkContext.parallelize(batch) for batch in task_batches]
> sieving_tasks = ssc.queueStream(p_batches)
> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths))
> countsState = relations.updateStateByKey(update_state)
> countsState.foreachRDD(gen_finals)
> ssc.checkpoint(s3n_path)
> ....
> ssc.start()
> ....
> def update_state(count, counts):
>     if counts is None:
>         counts = []
>     print(count)
>     counts.append(count)
>     return counts
> def gen_finals(rdd):
>     for (link, rank) in rdd.collect():
>         acc = 0
>         for l in rank:
>             acc += sum(l)
>         run_sieving.counts.append(acc)
>         run_sieving.out_files.add(link)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org