You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/22 06:02:58 UTC
spark git commit: [SPARK-18908][SS] Creating StreamingQueryException
should check if logicalPlan is created
Repository: spark
Updated Branches:
refs/heads/master e1b43dc45 -> ff7d82a20
[SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created
## What changes were proposed in this pull request?
This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created.
In addition, this PR also fixes the following issues in `StreamingQueryException`:
- `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message.
- Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace.
## How was this patch tested?
The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16322 from zsxwing/SPARK-18907.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7d82a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7d82a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7d82a2
Branch: refs/heads/master
Commit: ff7d82a207e8bef7779c27378f7a50a138627341
Parents: e1b43dc
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 21 22:02:57 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 21 22:02:57 2016 -0800
----------------------------------------------------------------------
.../execution/streaming/StreamExecution.scala | 141 ++++++++++++-------
.../sql/streaming/StreamingQueryException.scala | 28 +---
.../sql/streaming/FileStreamSourceSuite.scala | 39 +++--
.../spark/sql/streaming/StreamSuite.scala | 3 +-
.../apache/spark/sql/streaming/StreamTest.scala | 52 +++++--
.../streaming/StreamingQueryListenerSuite.scala | 2 +-
.../sql/streaming/StreamingQuerySuite.scala | 4 +-
7 files changed, 165 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e05200d..a35950e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.streaming._
@@ -67,6 +66,7 @@ class StreamExecution(
private val awaitBatchLock = new ReentrantLock(true)
private val awaitBatchLockCondition = awaitBatchLock.newCondition()
+ private val initializationLatch = new CountDownLatch(1)
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)
@@ -118,9 +118,22 @@ class StreamExecution(
private val prettyIdString =
Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
+ /**
+ * All stream sources present in the query plan. This will be set when generating logical plan.
+ */
+ @volatile protected var sources: Seq[Source] = Seq.empty
+
+ /**
+ * A list of unique sources in the query plan. This will be set when generating logical plan.
+ */
+ @volatile private var uniqueSources: Seq[Source] = Seq.empty
+
override lazy val logicalPlan: LogicalPlan = {
+ assert(microBatchThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
- analyzedPlan.transform {
+ val _logicalPlan = analyzedPlan.transform {
case StreamingRelation(dataSource, _, output) =>
// Materialize source to avoid creating it in every batch
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
@@ -130,22 +143,18 @@ class StreamExecution(
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
}
+ sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
+ uniqueSources = sources.distinct
+ _logicalPlan
}
- /** All stream sources present in the query plan. */
- protected lazy val sources =
- logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
-
- /** A list of unique sources in the query plan. */
- private lazy val uniqueSources = sources.distinct
-
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
}
/** Defines the internal state of execution */
@volatile
- private var state: State = INITIALIZED
+ private var state: State = INITIALIZING
@volatile
var lastExecution: QueryExecution = _
@@ -186,8 +195,11 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
+ /** Whether all fields of the query have been initialized */
+ private def isInitialized: Boolean = state != INITIALIZING
+
/** Whether the query is currently active or not */
- override def isActive: Boolean = state == ACTIVE
+ override def isActive: Boolean = state != TERMINATED
/** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
@@ -216,9 +228,6 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
- // Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
- // so must mark this as ACTIVE first.
- state = ACTIVE
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
@@ -235,6 +244,9 @@ class StreamExecution(
updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be created
logicalPlan
+ state = ACTIVE
+ // Unblock `awaitInitialization`
+ initializationLatch.countDown()
triggerExecutor.execute(() => {
startTrigger()
@@ -282,7 +294,7 @@ class StreamExecution(
updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
- this,
+ toDebugString(includeLogicalPlan = isInitialized),
s"Query $prettyIdString terminated with exception: ${e.getMessage}",
e,
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
@@ -295,17 +307,25 @@ class StreamExecution(
throw e
}
} finally {
- state = TERMINATED
- currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
+ // Release latches to unblock the user codes since exception can happen in any place and we
+ // may not get a chance to release them
+ startLatch.countDown()
+ initializationLatch.countDown()
- // Update metrics and status
- sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+ try {
+ state = TERMINATED
+ currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
- // Notify others
- sparkSession.streams.notifyQueryTermination(StreamExecution.this)
- postEvent(
- new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
- terminationLatch.countDown()
+ // Update metrics and status
+ sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+
+ // Notify others
+ sparkSession.streams.notifyQueryTermination(StreamExecution.this)
+ postEvent(
+ new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
+ } finally {
+ terminationLatch.countDown()
+ }
}
}
@@ -537,6 +557,7 @@ class StreamExecution(
* least the given `Offset`. This method is intended for use primarily when writing tests.
*/
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ assertAwaitThread()
def notDone = {
val localCommittedOffsets = committedOffsets
!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
@@ -559,7 +580,38 @@ class StreamExecution(
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
+ /**
+ * Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause
+ * dead-lock, e.g., calling any await APIs in `StreamingQueryListener.onQueryStarted` will block
+ * the stream thread forever.
+ */
+ private def assertAwaitThread(): Unit = {
+ if (microBatchThread eq Thread.currentThread) {
+ throw new IllegalStateException(
+ "Cannot wait for a query state from the same thread that is running the query")
+ }
+ }
+
+ /**
+ * Await until all fields of the query have been initialized.
+ */
+ def awaitInitialization(timeoutMs: Long): Unit = {
+ assertAwaitThread()
+ require(timeoutMs > 0, "Timeout has to be positive")
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
+ initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
+ }
+
override def processAllAvailable(): Unit = {
+ assertAwaitThread()
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
awaitBatchLock.lock()
try {
noNewData = false
@@ -578,9 +630,7 @@ class StreamExecution(
}
override def awaitTermination(): Unit = {
- if (state == INITIALIZED) {
- throw new IllegalStateException("Cannot wait for termination on a query that has not started")
- }
+ assertAwaitThread()
terminationLatch.await()
if (streamDeathCause != null) {
throw streamDeathCause
@@ -588,9 +638,7 @@ class StreamExecution(
}
override def awaitTermination(timeoutMs: Long): Boolean = {
- if (state == INITIALIZED) {
- throw new IllegalStateException("Cannot wait for termination on a query that has not started")
- }
+ assertAwaitThread()
require(timeoutMs > 0, "Timeout has to be positive")
terminationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
@@ -623,27 +671,24 @@ class StreamExecution(
s"Streaming Query $prettyIdString [state = $state]"
}
- def toDebugString: String = {
- val deathCauseStr = if (streamDeathCause != null) {
- "Error:\n" + stackTraceToString(streamDeathCause.cause)
- } else ""
- s"""
- |=== Streaming Query ===
- |Identifier: $prettyIdString
- |Current Offsets: $committedOffsets
- |
- |Current State: $state
- |Thread State: ${microBatchThread.getState}
- |
- |Logical Plan:
- |$logicalPlan
- |
- |$deathCauseStr
- """.stripMargin
+ private def toDebugString(includeLogicalPlan: Boolean): String = {
+ val debugString =
+ s"""|=== Streaming Query ===
+ |Identifier: $prettyIdString
+ |Current Committed Offsets: $committedOffsets
+ |Current Available Offsets: $availableOffsets
+ |
+ |Current State: $state
+ |Thread State: ${microBatchThread.getState}""".stripMargin
+ if (includeLogicalPlan) {
+ debugString + s"\n\nLogical Plan:\n$logicalPlan"
+ } else {
+ debugString
+ }
}
trait State
- case object INITIALIZED extends State
+ case object INITIALIZING extends State
case object ACTIVE extends State
case object TERMINATED extends State
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index a96150a..c53c295 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution}
/**
* :: Experimental ::
@@ -31,35 +30,18 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* @since 2.0.0
*/
@Experimental
-class StreamingQueryException private(
- causeString: String,
+class StreamingQueryException private[sql](
+ private val queryDebugString: String,
val message: String,
val cause: Throwable,
val startOffset: String,
val endOffset: String)
extends Exception(message, cause) {
- private[sql] def this(
- query: StreamingQuery,
- message: String,
- cause: Throwable,
- startOffset: String,
- endOffset: String) {
- this(
- // scalastyle:off
- s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
- |
- |${query.asInstanceOf[StreamExecution].toDebugString}
- """.stripMargin,
- // scalastyle:on
- message,
- cause,
- startOffset,
- endOffset)
- }
-
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
- override def toString(): String = causeString
+ override def toString(): String =
+ s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
+ |$queryDebugString""".stripMargin
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 55d927a..8a9fa94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -815,21 +815,31 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
test("max files per trigger - incorrect values") {
- withTempDir { case src =>
- def testMaxFilePerTriggerValue(value: String): Unit = {
- val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
- val e = intercept[IllegalArgumentException] {
- testStream(df)()
- }
- Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
- assert(e.getMessage.contains(s))
+ val testTable = "maxFilesPerTrigger_test"
+ withTable(testTable) {
+ withTempDir { case src =>
+ def testMaxFilePerTriggerValue(value: String): Unit = {
+ val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+ val e = intercept[StreamingQueryException] {
+ // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
+ val q = df.writeStream.format("memory").queryName(testTable).start()
+ try {
+ q.processAllAvailable()
+ } finally {
+ q.stop()
+ }
+ }
+ assert(e.getCause.isInstanceOf[IllegalArgumentException])
+ Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+ assert(e.getMessage.contains(s))
+ }
}
- }
- testMaxFilePerTriggerValue("not-a-integer")
- testMaxFilePerTriggerValue("-1")
- testMaxFilePerTriggerValue("0")
- testMaxFilePerTriggerValue("10.1")
+ testMaxFilePerTriggerValue("not-a-integer")
+ testMaxFilePerTriggerValue("-1")
+ testMaxFilePerTriggerValue("0")
+ testMaxFilePerTriggerValue("10.1")
+ }
}
}
@@ -1202,7 +1212,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
}
}
-/** Fake FileSystem to test whether the method `fs.exists` is called during
+/**
+ * Fake FileSystem to test whether the method `fs.exists` is called during
* `DataSource.resolveRelation`.
*/
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b8fa82d..34b0ee8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -259,8 +259,9 @@ class StreamSuite extends StreamTest {
override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+ // These error are fatal errors and should be ignored in `testStream` to not fail the test.
testStream(df)(
- ExpectFailure()(ClassTag(e.getClass))
+ ExpectFailure(isFatalError = true)(ClassTag(e.getClass))
)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 6fbbbb1..709050d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -167,10 +167,17 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Advance the trigger clock's time manually. */
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
- /** Signals that a failure is expected and should not kill the test. */
- case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
+ /**
+ * Signals that a failure is expected and should not kill the test.
+ *
+ * @param isFatalError if this is a fatal error. If so, the error should also be caught by
+ * UncaughtExceptionHandler.
+ */
+ case class ExpectFailure[T <: Throwable : ClassTag](
+ isFatalError: Boolean = false) extends StreamAction {
val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
- override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
+ override def toString(): String =
+ s"ExpectFailure[${causeClass.getName}, isFatalError: $isFatalError]"
}
/** Assert that a body is true */
@@ -240,7 +247,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val resetConfValues = mutable.Map[String, Option[String]]()
@volatile
- var streamDeathCause: Throwable = null
+ var streamThreadDeathCause: Throwable = null
// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
@@ -271,7 +278,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
|Output Mode: $outputMode
|Stream state: $currentOffsets
|Thread state: $threadState
- |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
+ |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
|
|== Sink ==
|${sink.toDebugString}
@@ -360,9 +367,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
- streamDeathCause = e
+ streamThreadDeathCause = e
}
})
+ // Wait until the initialization finishes, because some tests need to use `logicalPlan`
+ // after starting the query.
+ currentStream.awaitInitialization(streamingTimeout.toMillis)
case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null,
@@ -396,8 +406,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
currentStream.exception.map(_.toString()).getOrElse(""))
} catch {
case _: InterruptedException =>
- case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
- failTest("Timed out while stopping and waiting for microbatchthread to terminate.")
+ case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+ failTest(
+ "Timed out while stopping and waiting for microbatchthread to terminate.", e)
case t: Throwable =>
failTest("Error while stopping stream", t)
} finally {
@@ -421,16 +432,24 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
verify(exception.cause.getClass === ef.causeClass,
"incorrect cause in exception returned by query.exception()\n" +
s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}")
+ if (ef.isFatalError) {
+ // This is a fatal error, `streamThreadDeathCause` should be set to this error in
+ // UncaughtExceptionHandler.
+ verify(streamThreadDeathCause != null &&
+ streamThreadDeathCause.getClass === ef.causeClass,
+ "UncaughtExceptionHandler didn't receive the correct error\n" +
+ s"\tExpected: ${ef.causeClass}\n\tReturned: $streamThreadDeathCause")
+ streamThreadDeathCause = null
+ }
} catch {
case _: InterruptedException =>
- case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
- failTest("Timed out while waiting for failure")
+ case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+ failTest("Timed out while waiting for failure", e)
case t: Throwable =>
failTest("Error while checking stream failure", t)
} finally {
lastStream = currentStream
currentStream = null
- streamDeathCause = null
}
case a: AssertOnQuery =>
@@ -508,11 +527,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
pos += 1
}
+ if (streamThreadDeathCause != null) {
+ failTest("Stream Thread Died", streamThreadDeathCause)
+ }
} catch {
- case _: InterruptedException if streamDeathCause != null =>
- failTest("Stream Thread Died")
- case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
- failTest("Timed out waiting for stream")
+ case _: InterruptedException if streamThreadDeathCause != null =>
+ failTest("Stream Thread Died", streamThreadDeathCause)
+ case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+ failTest("Timed out waiting for stream", e)
} finally {
if (currentStream != null && currentStream.microBatchThread.isAlive) {
currentStream.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index a057d1d..4596aa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -111,7 +111,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
StartStream(ProcessingTime(100), triggerClock = clock),
AddData(inputData, 0),
AdvanceManualClock(100),
- ExpectFailure[SparkException],
+ ExpectFailure[SparkException](),
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
assert(listener.terminationEvent !== null)
http://git-wip-us.apache.org/repos/asf/spark/blob/ff7d82a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 6c4bb35..1525ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -142,7 +142,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
StartStream(),
AssertOnQuery(_.isActive === true),
AddData(inputData, 0),
- ExpectFailure[SparkException],
+ ExpectFailure[SparkException](),
AssertOnQuery(_.isActive === false),
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
@@ -306,7 +306,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
StartStream(ProcessingTime(100), triggerClock = clock),
AddData(inputData, 0),
AdvanceManualClock(100),
- ExpectFailure[SparkException],
+ ExpectFailure[SparkException](),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message.startsWith("Terminated with exception"))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org