You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kai-Michael Roesner (Jira)" <ji...@apache.org> on 2023/01/17 14:49:00 UTC
[jira] [Created] (SPARK-42102) Using checkpoints in Spark Structured Streaming with the foreachBatch sink
Kai-Michael Roesner created SPARK-42102:
-------------------------------------------
Summary: Using checkpoints in Spark Structured Streaming with the foreachBatch sink
Key: SPARK-42102
URL: https://issues.apache.org/jira/browse/SPARK-42102
Project: Spark
Issue Type: Question
Components: PySpark, Structured Streaming
Affects Versions: 3.3.1
Reporter: Kai-Michael Roesner
I want to build a fault-tolerant, recoverable Spark job (using Structured Streaming in PySpark) that reads a data stream from Kafka and uses the [{{foreachBatch}}|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] sink to implement a stateful transformation before writing the resulting data to the actual sink.
The basic structure of my Spark job is like this:
{code}
counter = 0
def batch_handler(df, batch_id):
global counter
counter += 1
df.withColumn('counter', lit(counter)).show(truncate=30)
spark = (SparkSession.builder
.appName('test.stateful.checkpoint')
.config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}')
.getOrCreate())
source = (spark.readStream
.format('kafka')
.options(**KAFKA_OPTIONS)
.option('subscribe', 'topic-spark-stateful')
.option('startingOffsets', 'earliest')
.option('includeHeaders', 'true')
.load())
(source
.selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS time')
.writeStream
.option('checkpointLocation', './checkpoints/stateful')
.foreachBatch(batch_handler)
.start()
.awaitTermination())
{code}
where the simplified {{batch_handler}} function is a stand-in for the stateful transformation + writer to the actual data sink. Also for simplicity I am using a local folder as checkpoint location.
This works fine as far as checkpointing of Kafka offsets is concerned. But how can I include the state of my custom batch handler ({{counter}} in my simplified example) in the checkpoints such that the job can pick up where it left after a crash?
The [Spark Structured Streaming Guide|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing] doesn't say anything on the topic. With the [{{foreach}}|(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] sink I can pass a custom row handler object but this seems to support only {{open}}, {{process}}, and {{close}} methods.
Would it make sense to create a "Request" or even "Feature" ticket to enhance this with methods for restoring state from a checkpoint and exporting state to support checkpointing?
PS: I have posted this on [SOF|https://stackoverflow.com/questions/74864425], too. If anyone cares to answer or comment I'd be happy to upvote their post.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org