You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Jalal, Jamal" <ja...@sap.com> on 2017/05/23 05:04:47 UTC

[Stateful Spark Streaming] Issues on Restart of job from checkpoint in v1.6.2 - Initial State RDD for MapWithState not attached to Spark Context

Hi,

We have an issue with Initial State RDD when we try to restart and recover the job from the checkpoint in version 1.6.2. I've attached the error stack to the mail. I've tried to reproduce the issue with a sample streaming application attached.

The error is because we are trying to refer to an external RDD (the initial state RDD in our case) in our DStream operation that's not attached to the spark context when recreating the context from checkpoint. By commenting the initial state RDD, the restarting of the job from the checkpoint works successfully.

The exception message reads
   "RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and                             count action cannot be performed inside of the rdd1.map transformation."

In actual, the actual error is because of referring to an external rdd that's not from the streaming source. This error is obvious in higher versions of Spark (2.1.0) as in the ticket https://issues.apache.org/jira/browse/SPARK-13758
To resolve the issue in higher version, I had to checkpoint the DStream in addition to streaming context itself as shown below. But doing the same back in version 1.6.2 didn't help.

But when we repeat the same with higher version 2.1.0 of spark, the recovering from checkpoint works fine. Also note, we had to checkpoint the DStream explicitly in addition to the checkpointing the streaming context, for the recovery to be successful even in the version 2.1.0.

val statefulStream1 = rawKafkaDirectStream.mapWithState(mappingFunc1)
val statefulStream2 = statefulStream1.mapWithState(mappingFunc2)

statefulStream2.foreachRDD(rdd => {
  rdd.foreach(msg => {
    println("In foreach: " + msg)
  })
  println("=========================================")
})

statefulStream1.checkpoint(Seconds(10))
statefulStream2.checkpoint(Seconds(10))

Is this a known issue with restarting of spark streaming job from checkpoint when initial RDD for stateful streaming is included in v1.6.2.
Can you help here with resolving this issue on spark version 1.6.2, and let me know if you have any other suggestions.

Best Regards,
Jamal