You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/01/10 14:24:53 UTC

spark git commit: [SPARK-19113][SS][TESTS] Set UncaughtExceptionHandler in onQueryStarted to ensure catching fatal errors during query initialization

Repository: spark
Updated Branches:
  refs/heads/master a2c6adcc5 -> 3ef183a94


[SPARK-19113][SS][TESTS] Set UncaughtExceptionHandler in onQueryStarted to ensure catching fatal errors during query initialization

## What changes were proposed in this pull request?

StreamTest sets `UncaughtExceptionHandler` after starting the query now. It may not be able to catch fatal errors during query initialization. This PR uses `onQueryStarted` callback to fix it.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16492 from zsxwing/SPARK-19113.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ef183a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ef183a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ef183a9

Branch: refs/heads/master
Commit: 3ef183a941d45b2f7ad167ea5133a93de0da5176
Parents: a2c6adc
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Jan 10 14:24:45 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 10 14:24:45 2017 +0000

----------------------------------------------------------------------
 .../spark/sql/streaming/StreamSuite.scala       |  7 +++--
 .../apache/spark/sql/streaming/StreamTest.scala | 28 +++++++++++++++-----
 2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ef183a9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 34b0ee8..e964e64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -238,7 +238,7 @@ class StreamSuite extends StreamTest {
     }
   }
 
-  testQuietly("fatal errors from a source should be sent to the user") {
+  testQuietly("handle fatal errors thrown from the stream thread") {
     for (e <- Seq(
       new VirtualMachineError {},
       new ThreadDeath,
@@ -259,8 +259,11 @@ class StreamSuite extends StreamTest {
         override def stop(): Unit = {}
       }
       val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
-      // These error are fatal errors and should be ignored in `testStream` to not fail the test.
       testStream(df)(
+        // `ExpectFailure(isFatalError = true)` verifies two things:
+        // - Fatal errors can be propagated to `StreamingQuery.exception` and
+        //   `StreamingQuery.awaitTermination` like non fatal errors.
+        // - Fatal errors can be caught by UncaughtExceptionHandler.
         ExpectFailure(isFatalError = true)(ClassTag(e.getClass))
       )
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ef183a9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 709050d..4aa4100 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -235,7 +235,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
    */
   def testStream(
       _stream: Dataset[_],
-      outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = {
+      outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized {
+    // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently
+    // because this method assumes there is only one active query in its `StreamingQueryListener`
+    // and it may not work correctly when multiple `testStream`s run concurrently.
 
     val stream = _stream.toDF()
     val sparkSession = stream.sparkSession  // use the session in DF, not the default session
@@ -248,6 +251,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
 
     @volatile
     var streamThreadDeathCause: Throwable = null
+    // Set UncaughtExceptionHandler in `onQueryStarted` so that we can ensure catching fatal errors
+    // during query initialization.
+    val listener = new StreamingQueryListener {
+      override def onQueryStarted(event: QueryStartedEvent): Unit = {
+        // Note: this assumes there is only one query active in the `testStream` method.
+        Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
+          override def uncaughtException(t: Thread, e: Throwable): Unit = {
+            streamThreadDeathCause = e
+          }
+        })
+      }
+
+      override def onQueryProgress(event: QueryProgressEvent): Unit = {}
+      override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+    }
+    sparkSession.streams.addListener(listener)
 
     // If the test doesn't manually start the stream, we do it automatically at the beginning.
     val startedManually =
@@ -364,12 +383,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                   triggerClock = triggerClock)
                 .asInstanceOf[StreamingQueryWrapper]
                 .streamingQuery
-            currentStream.microBatchThread.setUncaughtExceptionHandler(
-              new UncaughtExceptionHandler {
-                override def uncaughtException(t: Thread, e: Throwable): Unit = {
-                  streamThreadDeathCause = e
-                }
-              })
             // Wait until the initialization finishes, because some tests need to use `logicalPlan`
             // after starting the query.
             currentStream.awaitInitialization(streamingTimeout.toMillis)
@@ -545,6 +558,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
         case (key, Some(value)) => sparkSession.conf.set(key, value)
         case (key, None) => sparkSession.conf.unset(key)
       }
+      sparkSession.streams.removeListener(listener)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org