You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/22 06:02:58 UTC

spark git commit: [SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created

Repository: spark
Updated Branches:
  refs/heads/master e1b43dc45 -> ff7d82a20


[SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created

## What changes were proposed in this pull request?

This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created.

In addition, this PR also fixes the following issues in `StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace.

## How was this patch tested?

The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16322 from zsxwing/SPARK-18907.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7d82a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7d82a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7d82a2

Branch: refs/heads/master
Commit: ff7d82a207e8bef7779c27378f7a50a138627341
Parents: e1b43dc
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 21 22:02:57 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 21 22:02:57 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 141 ++++++++++++-------
 .../sql/streaming/StreamingQueryException.scala |  28 +---
 .../sql/streaming/FileStreamSourceSuite.scala   |  39 +++--
 .../spark/sql/streaming/StreamSuite.scala       |   3 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  52 +++++--
 .../streaming/StreamingQueryListenerSuite.scala |   2 +-
 .../sql/streaming/StreamingQuerySuite.scala     |   4 +-
 7 files changed, 165 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e05200d..a35950e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
@@ -67,6 +66,7 @@ class StreamExecution(
   private val awaitBatchLock = new ReentrantLock(true)
   private val awaitBatchLockCondition = awaitBatchLock.newCondition()
 
+  private val initializationLatch = new CountDownLatch(1)
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
@@ -118,9 +118,22 @@ class StreamExecution(
   private val prettyIdString =
     Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  /**
+   * All stream sources present in the query plan. This will be set when generating logical plan.
+   */
+  @volatile protected var sources: Seq[Source] = Seq.empty
+
+  /**
+   * A list of unique sources in the query plan. This will be set when generating logical plan.
+   */
+  @volatile private var uniqueSources: Seq[Source] = Seq.empty
+
   override lazy val logicalPlan: LogicalPlan = {
+    assert(microBatchThread eq Thread.currentThread,
+      "logicalPlan must be initialized in StreamExecutionThread " +
+        s"but the current thread was ${Thread.currentThread}")
     var nextSourceId = 0L
-    analyzedPlan.transform {
+    val _logicalPlan = analyzedPlan.transform {
       case StreamingRelation(dataSource, _, output) =>
         // Materialize source to avoid creating it in every batch
         val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
@@ -130,22 +143,18 @@ class StreamExecution(
         // "df.logicalPlan" has already used attributes of the previous `output`.
         StreamingExecutionRelation(source, output)
     }
+    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
+    uniqueSources = sources.distinct
+    _logicalPlan
   }
 
-  /** All stream sources present in the query plan. */
-  protected lazy val sources =
-    logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
-
-  /** A list of unique sources in the query plan. */
-  private lazy val uniqueSources = sources.distinct
-
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
   }
 
   /** Defines the internal state of execution */
   @volatile
-  private var state: State = INITIALIZED
+  private var state: State = INITIALIZING
 
   @volatile
   var lastExecution: QueryExecution = _
@@ -186,8 +195,11 @@ class StreamExecution(
    */
   val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
 
+  /** Whether all fields of the query have been initialized */
+  private def isInitialized: Boolean = state != INITIALIZING
+
   /** Whether the query is currently active or not */
-  override def isActive: Boolean = state == ACTIVE
+  override def isActive: Boolean = state != TERMINATED
 
   /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
   override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
@@ -216,9 +228,6 @@ class StreamExecution(
    */
   private def runBatches(): Unit = {
     try {
-      // Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
-      // so must mark this as ACTIVE first.
-      state = ACTIVE
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
@@ -235,6 +244,9 @@ class StreamExecution(
       updateStatusMessage("Initializing sources")
       // force initialization of the logical plan so that the sources can be created
       logicalPlan
+      state = ACTIVE
+      // Unblock `awaitInitialization`
+      initializationLatch.countDown()
 
       triggerExecutor.execute(() => {
         startTrigger()
@@ -282,7 +294,7 @@ class StreamExecution(
         updateStatusMessage("Stopped")
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
-          this,
+          toDebugString(includeLogicalPlan = isInitialized),
           s"Query $prettyIdString terminated with exception: ${e.getMessage}",
           e,
           committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
@@ -295,17 +307,25 @@ class StreamExecution(
           throw e
         }
     } finally {
-      state = TERMINATED
-      currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
+      // Release latches to unblock the user codes since exception can happen in any place and we
+      // may not get a chance to release them
+      startLatch.countDown()
+      initializationLatch.countDown()
 
-      // Update metrics and status
-      sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+      try {
+        state = TERMINATED
+        currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
 
-      // Notify others
-      sparkSession.streams.notifyQueryTermination(StreamExecution.this)
-      postEvent(
-       new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
-      terminationLatch.countDown()
+        // Update metrics and status
+        sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+
+        // Notify others
+        sparkSession.streams.notifyQueryTermination(StreamExecution.this)
+        postEvent(
+          new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
+      } finally {
+        terminationLatch.countDown()
+      }
     }
   }
 
@@ -537,6 +557,7 @@ class StreamExecution(
    * least the given `Offset`. This method is intended for use primarily when writing tests.
    */
   private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+    assertAwaitThread()
     def notDone = {
       val localCommittedOffsets = committedOffsets
       !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
@@ -559,7 +580,38 @@ class StreamExecution(
   /** A flag to indicate that a batch has completed with no new data available. */
   @volatile private var noNewData = false
 
+  /**
+   * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause
+   * dead-lock, e.g., calling any await APIs in `StreamingQueryListener.onQueryStarted` will block
+   * the stream thread forever.
+   */
+  private def assertAwaitThread(): Unit = {
+    if (microBatchThread eq Thread.currentThread) {
+      throw new IllegalStateException(
+        "Cannot wait for a query state from the same thread that is running the query")
+    }
+  }
+
+  /**
+   * Await until all fields of the query have been initialized.
+   */
+  def awaitInitialization(timeoutMs: Long): Unit = {
+    assertAwaitThread()
+    require(timeoutMs > 0, "Timeout has to be positive")
+    if (streamDeathCause != null) {
+      throw streamDeathCause
+    }
+    initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
+    if (streamDeathCause != null) {
+      throw streamDeathCause
+    }
+  }
+
   override def processAllAvailable(): Unit = {
+    assertAwaitThread()
+    if (streamDeathCause != null) {
+      throw streamDeathCause
+    }
     awaitBatchLock.lock()
     try {
       noNewData = false
@@ -578,9 +630,7 @@ class StreamExecution(
   }
 
   override def awaitTermination(): Unit = {
-    if (state == INITIALIZED) {
-      throw new IllegalStateException("Cannot wait for termination on a query that has not started")
-    }
+    assertAwaitThread()
     terminationLatch.await()
     if (streamDeathCause != null) {
       throw streamDeathCause
@@ -588,9 +638,7 @@ class StreamExecution(
   }
 
   override def awaitTermination(timeoutMs: Long): Boolean = {
-    if (state == INITIALIZED) {
-      throw new IllegalStateException("Cannot wait for termination on a query that has not started")
-    }
+    assertAwaitThread()
     require(timeoutMs > 0, "Timeout has to be positive")
     terminationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
     if (streamDeathCause != null) {
@@ -623,27 +671,24 @@ class StreamExecution(
     s"Streaming Query $prettyIdString [state = $state]"
   }
 
-  def toDebugString: String = {
-    val deathCauseStr = if (streamDeathCause != null) {
-      "Error:\n" + stackTraceToString(streamDeathCause.cause)
-    } else ""
-    s"""
-       |=== Streaming Query ===
-       |Identifier: $prettyIdString
-       |Current Offsets: $committedOffsets
-       |
-       |Current State: $state
-       |Thread State: ${microBatchThread.getState}
-       |
-       |Logical Plan:
-       |$logicalPlan
-       |
-       |$deathCauseStr
-     """.stripMargin
+  private def toDebugString(includeLogicalPlan: Boolean): String = {
+    val debugString =
+      s"""|=== Streaming Query ===
+          |Identifier: $prettyIdString
+          |Current Committed Offsets: $committedOffsets
+          |Current Available Offsets: $availableOffsets
+          |
+          |Current State: $state
+          |Thread State: ${microBatchThread.getState}""".stripMargin
+    if (includeLogicalPlan) {
+      debugString + s"\n\nLogical Plan:\n$logicalPlan"
+    } else {
+      debugString
+    }
   }
 
   trait State
-  case object INITIALIZED extends State
+  case object INITIALIZING extends State
   case object ACTIVE extends State
   case object TERMINATED extends State
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index a96150a..c53c295 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution}
 
 /**
  * :: Experimental ::
@@ -31,35 +30,18 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
  * @since 2.0.0
  */
 @Experimental
-class StreamingQueryException private(
-    causeString: String,
+class StreamingQueryException private[sql](
+    private val queryDebugString: String,
     val message: String,
     val cause: Throwable,
     val startOffset: String,
     val endOffset: String)
   extends Exception(message, cause) {
 
-  private[sql] def this(
-      query: StreamingQuery,
-      message: String,
-      cause: Throwable,
-      startOffset: String,
-      endOffset: String) {
-    this(
-      // scalastyle:off
-      s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
-         |
-         |${query.asInstanceOf[StreamExecution].toDebugString}
-         """.stripMargin,
-      // scalastyle:on
-      message,
-      cause,
-      startOffset,
-      endOffset)
-  }
-
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis
 
-  override def toString(): String = causeString
+  override def toString(): String =
+    s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
+       |$queryDebugString""".stripMargin
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 55d927a..8a9fa94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -815,21 +815,31 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger - incorrect values") {
-    withTempDir { case src =>
-      def testMaxFilePerTriggerValue(value: String): Unit = {
-        val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-        val e = intercept[IllegalArgumentException] {
-          testStream(df)()
-        }
-        Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
-          assert(e.getMessage.contains(s))
+    val testTable = "maxFilesPerTrigger_test"
+    withTable(testTable) {
+      withTempDir { case src =>
+        def testMaxFilePerTriggerValue(value: String): Unit = {
+          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+          val e = intercept[StreamingQueryException] {
+            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
+            val q = df.writeStream.format("memory").queryName(testTable).start()
+            try {
+              q.processAllAvailable()
+            } finally {
+              q.stop()
+            }
+          }
+          assert(e.getCause.isInstanceOf[IllegalArgumentException])
+          Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+            assert(e.getMessage.contains(s))
+          }
         }
-      }
 
-      testMaxFilePerTriggerValue("not-a-integer")
-      testMaxFilePerTriggerValue("-1")
-      testMaxFilePerTriggerValue("0")
-      testMaxFilePerTriggerValue("10.1")
+        testMaxFilePerTriggerValue("not-a-integer")
+        testMaxFilePerTriggerValue("-1")
+        testMaxFilePerTriggerValue("0")
+        testMaxFilePerTriggerValue("10.1")
+      }
     }
   }
 
@@ -1202,7 +1212,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
   }
 }
 
-/** Fake FileSystem to test whether the method `fs.exists` is called during
+/**
+ * Fake FileSystem to test whether the method `fs.exists` is called during
  * `DataSource.resolveRelation`.
  */
 class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b8fa82d..34b0ee8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -259,8 +259,9 @@ class StreamSuite extends StreamTest {
         override def stop(): Unit = {}
       }
       val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+      // These error are fatal errors and should be ignored in `testStream` to not fail the test.
       testStream(df)(
-        ExpectFailure()(ClassTag(e.getClass))
+        ExpectFailure(isFatalError = true)(ClassTag(e.getClass))
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 6fbbbb1..709050d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -167,10 +167,17 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   /** Advance the trigger clock's time manually. */
   case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
 
-  /** Signals that a failure is expected and should not kill the test. */
-  case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
+  /**
+   * Signals that a failure is expected and should not kill the test.
+   *
+   * @param isFatalError if this is a fatal error. If so, the error should also be caught by
+   *                     UncaughtExceptionHandler.
+   */
+  case class ExpectFailure[T <: Throwable : ClassTag](
+      isFatalError: Boolean = false) extends StreamAction {
     val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
-    override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
+    override def toString(): String =
+      s"ExpectFailure[${causeClass.getName}, isFatalError: $isFatalError]"
   }
 
   /** Assert that a body is true */
@@ -240,7 +247,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     val resetConfValues = mutable.Map[String, Option[String]]()
 
     @volatile
-    var streamDeathCause: Throwable = null
+    var streamThreadDeathCause: Throwable = null
 
     // If the test doesn't manually start the stream, we do it automatically at the beginning.
     val startedManually =
@@ -271,7 +278,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
          |Output Mode: $outputMode
          |Stream state: $currentOffsets
          |Thread state: $threadState
-         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
+         |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
          |
          |== Sink ==
          |${sink.toDebugString}
@@ -360,9 +367,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
             currentStream.microBatchThread.setUncaughtExceptionHandler(
               new UncaughtExceptionHandler {
                 override def uncaughtException(t: Thread, e: Throwable): Unit = {
-                  streamDeathCause = e
+                  streamThreadDeathCause = e
                 }
               })
+            // Wait until the initialization finishes, because some tests need to use `logicalPlan`
+            // after starting the query.
+            currentStream.awaitInitialization(streamingTimeout.toMillis)
 
           case AdvanceManualClock(timeToAdd) =>
             verify(currentStream != null,
@@ -396,8 +406,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                   currentStream.exception.map(_.toString()).getOrElse(""))
             } catch {
               case _: InterruptedException =>
-              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-                failTest("Timed out while stopping and waiting for microbatchthread to terminate.")
+              case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+                failTest(
+                  "Timed out while stopping and waiting for microbatchthread to terminate.", e)
               case t: Throwable =>
                 failTest("Error while stopping stream", t)
             } finally {
@@ -421,16 +432,24 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
               verify(exception.cause.getClass === ef.causeClass,
                 "incorrect cause in exception returned by query.exception()\n" +
                   s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}")
+              if (ef.isFatalError) {
+                // This is a fatal error, `streamThreadDeathCause` should be set to this error in
+                // UncaughtExceptionHandler.
+                verify(streamThreadDeathCause != null &&
+                  streamThreadDeathCause.getClass === ef.causeClass,
+                  "UncaughtExceptionHandler didn't receive the correct error\n" +
+                    s"\tExpected: ${ef.causeClass}\n\tReturned: $streamThreadDeathCause")
+                streamThreadDeathCause = null
+              }
             } catch {
               case _: InterruptedException =>
-              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-                failTest("Timed out while waiting for failure")
+              case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+                failTest("Timed out while waiting for failure", e)
               case t: Throwable =>
                 failTest("Error while checking stream failure", t)
             } finally {
               lastStream = currentStream
               currentStream = null
-              streamDeathCause = null
             }
 
           case a: AssertOnQuery =>
@@ -508,11 +527,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
         }
         pos += 1
       }
+      if (streamThreadDeathCause != null) {
+        failTest("Stream Thread Died", streamThreadDeathCause)
+      }
     } catch {
-      case _: InterruptedException if streamDeathCause != null =>
-        failTest("Stream Thread Died")
-      case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-        failTest("Timed out waiting for stream")
+      case _: InterruptedException if streamThreadDeathCause != null =>
+        failTest("Stream Thread Died", streamThreadDeathCause)
+      case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+        failTest("Timed out waiting for stream", e)
     } finally {
       if (currentStream != null && currentStream.microBatchThread.isAlive) {
         currentStream.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index a057d1d..4596aa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -111,7 +111,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         StartStream(ProcessingTime(100), triggerClock = clock),
         AddData(inputData, 0),
         AdvanceManualClock(100),
-        ExpectFailure[SparkException],
+        ExpectFailure[SparkException](),
         AssertOnQuery { query =>
           eventually(Timeout(streamingTimeout)) {
             assert(listener.terminationEvent !== null)

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 6c4bb35..1525ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -142,7 +142,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       StartStream(),
       AssertOnQuery(_.isActive === true),
       AddData(inputData, 0),
-      ExpectFailure[SparkException],
+      ExpectFailure[SparkException](),
       AssertOnQuery(_.isActive === false),
       TestAwaitTermination(ExpectException[SparkException]),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
@@ -306,7 +306,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       StartStream(ProcessingTime(100), triggerClock = clock),
       AddData(inputData, 0),
       AdvanceManualClock(100),
-      ExpectFailure[SparkException],
+      ExpectFailure[SparkException](),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message.startsWith("Terminated with exception"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org