You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/02/13 03:54:15 UTC

[spark] 01/02: [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 7f13fd0c5a79ab21c4ace2445127e6c69a7f745c
Author: Jose Torres <jo...@databricks.com>
AuthorDate: Mon Feb 26 11:28:44 2018 -0800

    [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring
    
    ## What changes were proposed in this pull request?
    
    Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.
    
    ## How was this patch tested?
    
    existing tests
    
    Author: Jose Torres <jo...@databricks.com>
    
    Closes #20622 from jose-torres/SPARK-23441.
---
 .../execution/streaming/continuous/ContinuousExecution.scala   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c3294d6..11df2c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -236,9 +236,7 @@ class ContinuousExecution(
             startTrigger()
 
             if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
-              stopSources()
               if (queryExecutionThread.isAlive) {
-                sparkSession.sparkContext.cancelJobGroup(runId.toString)
                 queryExecutionThread.interrupt()
               }
               false
@@ -266,12 +264,20 @@ class ContinuousExecution(
         SQLExecution.withNewExecutionId(
           sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
       }
+    } catch {
+      case t: Throwable
+          if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
+        logInfo(s"Query $id ignoring exception from reconfiguring: $t")
+        // interrupted by reconfiguration - swallow exception so we can restart the query
     } finally {
       epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
       SparkEnv.get.rpcEnv.stop(epochEndpoint)
 
       epochUpdateThread.interrupt()
       epochUpdateThread.join()
+
+      stopSources()
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org