You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by zenglong chen <cz...@gmail.com> on 2019/07/30 03:02:28 UTC
Spark checkpoint problem for python api
Hi,
My code is below:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def test(record_list):
print(list(record_list))
return record_list
def functionToCreateContext():
conf = SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077")
\
.set("spark.executor.memory", '6g') \
.set("spark.executor.cores", '8') \
.set("spark.deploy.defaultCores", '8') \
.set("spark.cores.max", '16') \
.set("spark.streaming.kafka.maxRatePerPartition", 1) \
.set("spark.streaming.blockInterval", 1) \
.set("spark.default.parallelism", 8) \
.set("spark.driver.host", '172.22.9.181') \
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark/checkpoints/model_event_spark")
return ssc
if __name__ == '__main__':
ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark",
functionToCreateContext)
record_dstream =
KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"],
kafkaParams={"bootstrap.servers":"xxx:9092",
"auto.offset.reset":"smallest",
},
)
record_dstream.checkpoint(5).mapPartitions(test).pprint()
ssc.start()
ssc.awaitTermination()
When the scripts starts at the first time,it work well.
But second time started from checkpointDirectory,it has problem like:
2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error
starting the context, marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed
has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
What is wrong with my script?