You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/02/13 03:54:14 UTC

[spark] branch branch-2.3 updated (abce846 -> 55d5a19)

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git.


    from abce846  [SPARK-23408][SS][BRANCH-2.3] Synchronize successive AddData actions in Streaming*JoinSuite
     new 7f13fd0  [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring
     new 55d5a19  [SPARK-23416][SS] Add a specific stop method for ContinuousExecution.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../execution/streaming/MicroBatchExecution.scala  | 18 +++++++++++++++
 .../sql/execution/streaming/StreamExecution.scala  | 18 ---------------
 .../streaming/continuous/ContinuousExecution.scala | 26 ++++++++++++++++++++--
 3 files changed, 42 insertions(+), 20 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 01/02: [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 7f13fd0c5a79ab21c4ace2445127e6c69a7f745c
Author: Jose Torres <jo...@databricks.com>
AuthorDate: Mon Feb 26 11:28:44 2018 -0800

    [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring
    
    ## What changes were proposed in this pull request?
    
    Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.
    
    ## How was this patch tested?
    
    existing tests
    
    Author: Jose Torres <jo...@databricks.com>
    
    Closes #20622 from jose-torres/SPARK-23441.
---
 .../execution/streaming/continuous/ContinuousExecution.scala   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c3294d6..11df2c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -236,9 +236,7 @@ class ContinuousExecution(
             startTrigger()
 
             if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
-              stopSources()
               if (queryExecutionThread.isAlive) {
-                sparkSession.sparkContext.cancelJobGroup(runId.toString)
                 queryExecutionThread.interrupt()
               }
               false
@@ -266,12 +264,20 @@ class ContinuousExecution(
         SQLExecution.withNewExecutionId(
           sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
       }
+    } catch {
+      case t: Throwable
+          if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
+        logInfo(s"Query $id ignoring exception from reconfiguring: $t")
+        // interrupted by reconfiguration - swallow exception so we can restart the query
     } finally {
       epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
       SparkEnv.get.rpcEnv.stop(epochEndpoint)
 
       epochUpdateThread.interrupt()
       epochUpdateThread.join()
+
+      stopSources()
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 02/02: [SPARK-23416][SS] Add a specific stop method for ContinuousExecution.

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 55d5a19c8e01de945c4c9e42752ed132df4b9110
Author: Jose Torres <to...@gmail.com>
AuthorDate: Wed May 23 17:21:29 2018 -0700

    [SPARK-23416][SS] Add a specific stop method for ContinuousExecution.
    
    ## What changes were proposed in this pull request?
    
    Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.
    
    ## How was this patch tested?
    
    Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().
    
    Author: Jose Torres <to...@gmail.com>
    
    Closes #21384 from jose-torres/fixKafka.
---
 .../sql/execution/streaming/MicroBatchExecution.scala  | 18 ++++++++++++++++++
 .../sql/execution/streaming/StreamExecution.scala      | 18 ------------------
 .../streaming/continuous/ContinuousExecution.scala     | 16 ++++++++++++++++
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 8bf1dd3..7f09bd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -111,6 +111,24 @@ class MicroBatchExecution(
   }
 
   /**
+   * Signals to the thread executing micro-batches that it should stop running after the next
+   * batch. This method blocks until the thread stops running.
+   */
+  override def stop(): Unit = {
+    // Set the state to TERMINATED so that the batching thread knows that it was interrupted
+    // intentionally
+    state.set(TERMINATED)
+    if (queryExecutionThread.isAlive) {
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
+      queryExecutionThread.interrupt()
+      queryExecutionThread.join()
+      // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
+    }
+    logInfo(s"Query $prettyIdString was stopped")
+  }
+
+  /**
    * Repeatedly attempts to run batches as data arrives.
    */
   protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3fc8c78..290de87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -379,24 +379,6 @@ abstract class StreamExecution(
   }
 
   /**
-   * Signals to the thread executing micro-batches that it should stop running after the next
-   * batch. This method blocks until the thread stops running.
-   */
-  override def stop(): Unit = {
-    // Set the state to TERMINATED so that the batching thread knows that it was interrupted
-    // intentionally
-    state.set(TERMINATED)
-    if (queryExecutionThread.isAlive) {
-      sparkSession.sparkContext.cancelJobGroup(runId.toString)
-      queryExecutionThread.interrupt()
-      queryExecutionThread.join()
-      // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
-      sparkSession.sparkContext.cancelJobGroup(runId.toString)
-    }
-    logInfo(s"Query $prettyIdString was stopped")
-  }
-
-  /**
    * Blocks the current thread until processing for data from the given `source` has reached at
    * least the given `Offset`. This method is intended for use primarily when writing tests.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 11df2c2..62adedb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -362,6 +362,22 @@ class ContinuousExecution(
       }
     }
   }
+
+  /**
+   * Stops the query execution thread to terminate the query.
+   */
+  override def stop(): Unit = {
+    // Set the state to TERMINATED so that the batching thread knows that it was interrupted
+    // intentionally
+    state.set(TERMINATED)
+    if (queryExecutionThread.isAlive) {
+      // The query execution thread will clean itself up in the finally clause of runContinuous.
+      // We just need to interrupt the long running job.
+      queryExecutionThread.interrupt()
+      queryExecutionThread.join()
+    }
+    logInfo(s"Query $prettyIdString was stopped")
+  }
 }
 
 object ContinuousExecution {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org