You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/06 01:26:33 UTC

spark git commit: [SPARK-21248][SS] The clean up codes in StreamExecution should not be interrupted

Repository: spark
Updated Branches:
  refs/heads/master c8d0aba19 -> ab866f117


[SPARK-21248][SS] The clean up codes in StreamExecution should not be interrupted

## What changes were proposed in this pull request?

This PR uses `runUninterruptibly` to avoid that the clean up codes in StreamExecution is interrupted. It also removes an optimization in `runUninterruptibly` to make sure this method never throw `InterruptedException`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #18461 from zsxwing/SPARK-21248.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab866f11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab866f11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab866f11

Branch: refs/heads/master
Commit: ab866f117378e64dba483ead51b769ae7be31d4d
Parents: c8d0aba
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jul 5 18:26:28 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jul 5 18:26:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/UninterruptibleThread.scala     | 10 +---------
 .../apache/spark/util/UninterruptibleThreadSuite.scala    |  5 ++---
 .../spark/sql/execution/streaming/StreamExecution.scala   |  6 +++++-
 3 files changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
index 27922b3..6a58ec1 100644
--- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
+++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala
@@ -55,9 +55,6 @@ private[spark] class UninterruptibleThread(
    * Run `f` uninterruptibly in `this` thread. The thread won't be interrupted before returning
    * from `f`.
    *
-   * If this method finds that `interrupt` is called before calling `f` and it's not inside another
-   * `runUninterruptibly`, it will throw `InterruptedException`.
-   *
    * Note: this method should be called only in `this` thread.
    */
   def runUninterruptibly[T](f: => T): T = {
@@ -73,12 +70,7 @@ private[spark] class UninterruptibleThread(
 
     uninterruptibleLock.synchronized {
       // Clear the interrupted status if it's set.
-      if (Thread.interrupted() || shouldInterruptThread) {
-        shouldInterruptThread = false
-        // Since it's interrupted, we don't need to run `f` which may be a long computation.
-        // Throw InterruptedException as we don't have a T to return.
-        throw new InterruptedException()
-      }
+      shouldInterruptThread = Thread.interrupted() || shouldInterruptThread
       uninterruptible = true
     }
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala
index 39b31f8..6a190f6 100644
--- a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala
@@ -68,7 +68,6 @@ class UninterruptibleThreadSuite extends SparkFunSuite {
         Uninterruptibles.awaitUninterruptibly(interruptLatch, 10, TimeUnit.SECONDS)
         try {
           runUninterruptibly {
-            assert(false, "Should not reach here")
           }
         } catch {
           case _: InterruptedException => hasInterruptedException = true
@@ -80,8 +79,8 @@ class UninterruptibleThreadSuite extends SparkFunSuite {
     t.interrupt()
     interruptLatch.countDown()
     t.join()
-    assert(hasInterruptedException === true)
-    assert(interruptStatusBeforeExit === false)
+    assert(hasInterruptedException === false)
+    assert(interruptStatusBeforeExit === true)
   }
 
   test("nested runUninterruptibly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index d5f8d2a..10c42a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -357,7 +357,11 @@ class StreamExecution(
         if (!NonFatal(e)) {
           throw e
         }
-    } finally {
+    } finally microBatchThread.runUninterruptibly {
+      // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted
+      // when a query is stopped by the user. We need to make sure the following codes finish
+      // otherwise it may throw `InterruptedException` to `UncaughtExceptionHandler` (SPARK-21248).
+
       // Release latches to unblock the user codes since exception can happen in any place and we
       // may not get a chance to release them
       startLatch.countDown()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org