You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kai-Michael Roesner (Jira)" <ji...@apache.org> on 2023/01/17 14:49:00 UTC

[jira] [Created] (SPARK-42102) Using checkpoints in Spark Structured Streaming with the foreachBatch sink

Kai-Michael Roesner created SPARK-42102:
-------------------------------------------

             Summary: Using checkpoints in Spark Structured Streaming with the foreachBatch sink
                 Key: SPARK-42102
                 URL: https://issues.apache.org/jira/browse/SPARK-42102
             Project: Spark
          Issue Type: Question
          Components: PySpark, Structured Streaming
    Affects Versions: 3.3.1
            Reporter: Kai-Michael Roesner


I want to build a fault-tolerant, recoverable Spark job (using Structured Streaming in PySpark) that reads a data stream from Kafka and uses the [{{foreachBatch}}|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] sink to implement a stateful transformation before writing the resulting data to the actual sink.

The basic structure of my Spark job is like this:
{code}
counter = 0

def batch_handler(df, batch_id):
  global counter
  counter += 1
  df.withColumn('counter', lit(counter)).show(truncate=30)

spark = (SparkSession.builder
  .appName('test.stateful.checkpoint')
  .config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}')
  .getOrCreate())

source = (spark.readStream
  .format('kafka')
  .options(**KAFKA_OPTIONS)
  .option('subscribe', 'topic-spark-stateful')
  .option('startingOffsets', 'earliest')
  .option('includeHeaders', 'true')
  .load())

(source
  .selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS time')
  .writeStream
  .option('checkpointLocation', './checkpoints/stateful')
  .foreachBatch(batch_handler)
  .start()
  .awaitTermination())
{code}

where the simplified {{batch_handler}} function is a stand-in for the stateful transformation + writer to the actual data sink. Also for simplicity I am using a local folder as checkpoint location. 

This works fine as far as checkpointing of Kafka offsets is concerned. But how can I include the state of my custom batch handler ({{counter}} in my simplified example) in the checkpoints such that the job can pick up where it left after a crash?

The [Spark Structured Streaming Guide|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing] doesn't say anything on the topic. With the [{{foreach}}|(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] sink I can pass a custom row handler object but this seems to support only {{open}}, {{process}}, and {{close}} methods.

Would it make sense to create a "Request" or even "Feature" ticket to enhance this with methods for restoring state from a checkpoint and exporting state to support checkpointing?

PS: I have posted this on [SOF|https://stackoverflow.com/questions/74864425], too. If anyone cares to answer or comment I'd be happy to upvote their post.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org