You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/03/09 22:27:13 UTC
[spark] branch master updated: [SPARK-27111][SS] Fix a race that a
continuous query may fail with InterruptedException
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6e1c082 [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException
6e1c082 is described below
commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Sat Mar 9 14:26:58 2019 -0800
[SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException
## What changes were proposed in this pull request?
Before a Kafka consumer gets assigned with partitions, its offset will contain 0 partitions. However, runContinuous will still run and launch a Spark job having 0 partitions. In this case, there is a race that epoch may interrupt the query execution thread after `lastExecution.toRdd`, and either `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next `runContinuous` will get interrupted unintentionally.
To handle this case, this PR has the following changes:
- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase the waiting time of `stop` but should be minor because the operations here are very fast (just sending an RPC message in the same process and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the `runContinuous` call. We may clear the interrupted status set by `stop`, but it doesn't affect the query termination because `runActivatedStream` will check `state` and exit accordingly.
I also updated the clean up codes to make sure exceptions thrown from `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the clean up.
## How was this patch tested?
Jenkins
Closes #24034 from zsxwing/SPARK-27111.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../streaming/continuous/ContinuousExecution.scala | 30 +++++++++++++++++-----
1 file changed, 23 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 26b5642..aef556d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -268,13 +268,29 @@ class ContinuousExecution(
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
- epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
- SparkEnv.get.rpcEnv.stop(epochEndpoint)
-
- epochUpdateThread.interrupt()
- epochUpdateThread.join()
-
- sparkSession.sparkContext.cancelJobGroup(runId.toString)
+ // The above execution may finish before getting interrupted, for example, a Spark job having
+ // 0 partitions will complete immediately. Then the interrupted status will sneak here.
+ //
+ // To handle this case, we do the two things here:
+ //
+ // 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase
+ // the waiting time of `stop` but should be minor because the operations here are very fast
+ // (just sending an RPC message in the same process and stopping a very simple thread).
+ // 2. Clear the interrupted status at the end so that it won't impact the `runContinuous`
+ // call. We may clear the interrupted status set by `stop`, but it doesn't affect the query
+ // termination because `runActivatedStream` will check `state` and exit accordingly.
+ queryExecutionThread.runUninterruptibly {
+ try {
+ epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
+ } finally {
+ SparkEnv.get.rpcEnv.stop(epochEndpoint)
+ epochUpdateThread.interrupt()
+ epochUpdateThread.join()
+ // The following line must be the last line because it may fail if SparkContext is stopped
+ sparkSession.sparkContext.cancelJobGroup(runId.toString)
+ }
+ }
+ Thread.interrupted()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org