You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by zenglong chen <cz...@gmail.com> on 2019/07/30 03:02:28 UTC

Spark checkpoint problem for python api

Hi,
    My code is below:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def test(record_list):
    print(list(record_list))
    return record_list

def functionToCreateContext():
    conf = SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077")
\
        .set("spark.executor.memory", '6g') \
        .set("spark.executor.cores", '8') \
        .set("spark.deploy.defaultCores", '8') \
        .set("spark.cores.max", '16') \
        .set("spark.streaming.kafka.maxRatePerPartition", 1) \
        .set("spark.streaming.blockInterval", 1) \
        .set("spark.default.parallelism", 8) \
        .set("spark.driver.host", '172.22.9.181') \


    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/spark/checkpoints/model_event_spark")
    return ssc

if __name__ == '__main__':
    ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark",
functionToCreateContext)
    record_dstream =
KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"],

kafkaParams={"bootstrap.servers":"xxx:9092",

"auto.offset.reset":"smallest",
                                                                },
                                                   )

    record_dstream.checkpoint(5).mapPartitions(test).pprint()
    ssc.start()
    ssc.awaitTermination()


When the scripts starts at the first time,it work well.

But second time started from checkpointDirectory,it has problem like:

2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error
starting the context, marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed
has not been initialized
	at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
	at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

What is wrong with my script?