You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/26 19:28:49 UTC
spark git commit: [SPARK-23491][SS] Remove explicit job cancellation
from ContinuousExecution reconfiguring
Repository: spark
Updated Branches:
refs/heads/master 185f5bc7d -> 7ec83658f
[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring
## What changes were proposed in this pull request?
Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.
## How was this patch tested?
existing tests
Author: Jose Torres <jo...@databricks.com>
Closes #20622 from jose-torres/SPARK-23441.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ec83658
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ec83658
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ec83658
Branch: refs/heads/master
Commit: 7ec83658fbc88505dfc2d8a6f76e90db747f1292
Parents: 185f5bc
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Feb 26 11:28:44 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Feb 26 11:28:44 2018 -0800
----------------------------------------------------------------------
.../streaming/continuous/ContinuousExecution.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7ec83658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 2c1d6c5..daebd1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -236,9 +236,7 @@ class ContinuousExecution(
startTrigger()
if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
- stopSources()
if (queryExecutionThread.isAlive) {
- sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
@@ -266,12 +264,20 @@ class ContinuousExecution(
SQLExecution.withNewExecutionId(
sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
}
+ } catch {
+ case t: Throwable
+ if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
+ logInfo(s"Query $id ignoring exception from reconfiguring: $t")
+ // interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)
epochUpdateThread.interrupt()
epochUpdateThread.join()
+
+ stopSources()
+ sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org