You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yufei ding (Jira)" <ji...@apache.org> on 2021/11/26 08:20:00 UTC

[jira] [Commented] (SPARK-36240) Graceful termination of Spark Structured Streaming queries

    [ https://issues.apache.org/jira/browse/SPARK-36240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449458#comment-17449458 ] 

yufei ding commented on SPARK-36240:
------------------------------------

This feature is expected to be available in which version of Spark?

> Graceful termination of Spark Structured Streaming queries
> ----------------------------------------------------------
>
>                 Key: SPARK-36240
>                 URL: https://issues.apache.org/jira/browse/SPARK-36240
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>         Environment:  
>  
>            Reporter: Zoltán Zvara
>            Priority: Major
>
> Spark Streaming provides a way to gracefully stop the streaming application using the configuration parameter {{spark.streaming.stopGracefullyOnShutdown}}. The configuration states:
> {quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on JVM shutdown rather than immediately.
> {quote}
> This effectively stops the job generation (see {{JobGenerator}} of Spark Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be finished instead of canceling the active job itself.
> Some applications may require graceful stopping so that their output would remain consistent - an output that is written out halfway poses a lot of problems for applications that would require "exactly-once" semantics.
> There is no support in Structured Streaming to gracefully stop queries/streaming applications.
> Naive solutions found on the web propose checking whether the queries are active using {{query.isActive}} or checking query state directly and then attempting to call {{stop()}} at the right time. In Structured Streaming, with the current implementation, {{stop()}} cancels the job group that may lead to inconsistent output, because it still depends on the timing of the cancellation.
> _Proposed solution:_
> Strictly speaking in the context of the micro-batch execution model, a {{StreamingQuery}} that we want to gracefully stop would be of implementation \{{MicroBatchExecution. }}The motivation is similar to that of the Streaming Context's gracefulness: stop the "job generation" and then wait for any active job to finish, instead of canceling the jobs.
> The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the {{MicroBatchExecution}} class.
>  
> {code:java}
> private val triggerExecutor = trigger match {
>   case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
>   case OneTimeTrigger => OneTimeExecutor()
>   case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
> }
> {code}
> The following while-true is being run be the job generation mechanism. The {{triggerHandler}} is a UDF that generates the micro-batches.
> {code:java}
> override def execute(triggerHandler: () => Boolean): Unit = {
>   while (true) {
>     val triggerTimeMs = clock.getTimeMillis
>     val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
>     val terminated = !triggerHandler()
>     if (intervalMs > 0) {
>       val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
>       if (batchElapsedTimeMs > intervalMs) {
>         notifyBatchFallingBehind(batchElapsedTimeMs)
>       }
>       if (terminated) {
>         return
>       }
>       clock.waitTillTime(nextTriggerTimeMs)
>     } else {
>       if (terminated) {
>         return
>       }
>     }
>   }
> }
> {code}
> Here, upon a {{gracefulStop()}} signal from the queries could essentially signal {{ProcessingTimeExecutor}} to stop triggering new batches.
> Then another mechanism is required that would await until any current job is finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be stopped as well.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org