You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/06/23 05:22:01 UTC
[jira] [Commented] (SPARK-8553) Resuming Checkpointed QueueStream
Fails
[ https://issues.apache.org/jira/browse/SPARK-8553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14597092#comment-14597092 ]
Josh Rosen commented on SPARK-8553:
-----------------------------------
If you look at the code that throws the warning, you'll notice that it's only hit in cases where RDD._sc == null, so were not not for the SPARK-5063 warning then your code would have died with a NullPointerException instead:
{code}
private def sc: SparkContext = {
if (_sc == null) {
throw new SparkException(
"RDD transformations and actions can only be invoked by the driver, not inside of other " +
"transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because " +
"the values transformation and count action cannot be performed inside of the rdd1.map " +
"transformation. For more information, see SPARK-5063.")
}
_sc
}
{code}
I'm inclined to think that this is caused by a bug, but I'm not familiar enough with streaming internals to be able to quickly pinpoint what's cause the masked NPE.
ping [~tdas] [~zsxwing], any ideas on this one?
> Resuming Checkpointed QueueStream Fails
> ---------------------------------------
>
> Key: SPARK-8553
> URL: https://issues.apache.org/jira/browse/SPARK-8553
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Streaming
> Affects Versions: 1.4.0
> Reporter: Shaanan Cohney
>
> After using a QueueStream within a checkpointed StreamingContext, when the context is resumed the following error is triggered:
> {code}
> 15/06/23 02:33:09 WARN QueueInputDStream: isTimeValid called with 1434987594000 ms where as last valid time is 1434987678000 ms
> 15/06/23 02:33:09 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> at org.apache.spark.rdd.RDD.persist(RDD.scala:162)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:357)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:354)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:354)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:195)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:242)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:241)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Traceback (most recent call last):
> File "factor.py", line 74, in <module>
> main()
> File "factor.py", line 60, in main
> purged_filename = sieving.run(sc, parameters, poly_filename=poly_filename)
> File "/home/ubuntu/spark_apps/sieving.py", line 403, in run
> (rels, outfiles) = run_sieving(sc, parameters, poly, poly_filename, fb_paths, rels_found, rels_wanted)
> File "/home/ubuntu/spark_apps/sieving.py", line 250, in run_sieving
> ssc.start()
> File "/home/ubuntu/spark/python/pyspark/streaming/context.py", line 185, in start
> self._jssc.start()
> File "/usr/local/lib/python3.4/dist-packages/py4j/java_gateway.py", line 538, in __call__
> self.target_id, self.name)
> File "/usr/local/lib/python3.4/dist-packages/py4j/protocol.py", line 300, in
> {code}
> The code triggering the error is Python3 running on Spark Standalone:
> {code}
> ssc = StreamingContext.getOrCreate(s3n_path, make_ssc)
> ....
> p_batches = [ssc.sparkContext.parallelize(batch) for batch in task_batches]
> sieving_tasks = ssc.queueStream(p_batches)
> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths))
> countsState = relations.updateStateByKey(update_state)
> countsState.foreachRDD(gen_finals)
> ssc.checkpoint(s3n_path)
> ....
> ssc.start()
> ....
> def update_state(count, counts):
> if counts is None:
> counts = []
> print(count)
> counts.append(count)
> return counts
> def gen_finals(rdd):
> for (link, rank) in rdd.collect():
> acc = 0
> for l in rank:
> acc += sum(l)
> run_sieving.counts.append(acc)
> run_sieving.out_files.add(link)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org