You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/30 07:09:01 UTC
spark git commit: [SPARK-18516][STRUCTURED STREAMING] Follow up PR to
add StreamingQuery.status to Python
Repository: spark
Updated Branches:
refs/heads/master 4c82ca86d -> bc09a2b8c
[SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python
## What changes were proposed in this pull request?
- Add StreamingQueryStatus.json
- Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress)
- Add StreamingQuery.status to Python
- Fix post-termination status
## How was this patch tested?
New unit tests
Author: Tathagata Das <ta...@gmail.com>
Closes #16075 from tdas/SPARK-18516-1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc09a2b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc09a2b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc09a2b8
Branch: refs/heads/master
Commit: bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c
Parents: 4c82ca8
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Nov 29 23:08:56 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Nov 29 23:08:56 2016 -0800
----------------------------------------------------------------------
python/pyspark/sql/streaming.py | 8 ++
python/pyspark/sql/tests.py | 5 +
.../execution/streaming/ProgressReporter.scala | 5 +-
.../execution/streaming/StreamExecution.scala | 4 +
.../sql/streaming/StreamingQueryStatus.scala | 38 +++++-
.../apache/spark/sql/streaming/progress.scala | 9 +-
.../streaming/StreamingQueryListenerSuite.scala | 29 +----
.../streaming/StreamingQueryProgressSuite.scala | 98 ---------------
.../StreamingQueryStatusAndProgressSuite.scala | 120 +++++++++++++++++++
.../sql/streaming/StreamingQuerySuite.scala | 49 +++++---
10 files changed, 219 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index c420b0d..84f01d3 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -90,6 +90,14 @@ class StreamingQuery(object):
@property
@since(2.1)
+ def status(self):
+ """
+ Returns the current status of the query.
+ """
+ return json.loads(self._jsq.status().json())
+
+ @property
+ @since(2.1)
def recentProgresses(self):
"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
The number of progress updates retained for each stream is configured by Spark session
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7151f95..b7b2a59 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1097,9 +1097,14 @@ class SQLTests(ReusedPySparkTestCase):
q.processAllAvailable()
lastProgress = q.lastProgress
recentProgresses = q.recentProgresses
+ status = q.status
self.assertEqual(lastProgress['name'], q.name)
self.assertEqual(lastProgress['id'], q.id)
self.assertTrue(any(p == lastProgress for p in recentProgresses))
+ self.assertTrue(
+ "message" in status and
+ "isDataAvailable" in status and
+ "isTriggerActive" in status)
finally:
q.stop()
shutil.rmtree(tmpPath)
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index b7b6e19..ba77e7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -70,11 +70,12 @@ trait ProgressReporter extends Logging {
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
@volatile
- protected var currentStatus: StreamingQueryStatus =
- StreamingQueryStatus(
+ protected var currentStatus: StreamingQueryStatus = {
+ new StreamingQueryStatus(
message = "Initializing StreamExecution",
isDataAvailable = false,
isTriggerActive = false)
+ }
/** Returns the current status of the query. */
def status: StreamingQueryStatus = currentStatus
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e4f31af..6d0e269 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -238,8 +238,10 @@ class StreamExecution(
updateStatusMessage("Waiting for next trigger")
isTerminated
})
+ updateStatusMessage("Stopped")
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
+ updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
@@ -247,6 +249,7 @@ class StreamExecution(
e,
Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
logError(s"Query $name terminated with error", e)
+ updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
if (!NonFatal(e)) {
@@ -254,6 +257,7 @@ class StreamExecution(
}
} finally {
state = TERMINATED
+ currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
// Update metrics and status
sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4c1a7ce..44befa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.streaming
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
/**
* Reports information about the instantaneous status of a streaming query.
*
@@ -27,7 +32,32 @@ package org.apache.spark.sql.streaming
*
* @since 2.1.0
*/
-case class StreamingQueryStatus protected[sql](
- message: String,
- isDataAvailable: Boolean,
- isTriggerActive: Boolean)
+class StreamingQueryStatus protected[sql](
+ val message: String,
+ val isDataAvailable: Boolean,
+ val isTriggerActive: Boolean) {
+
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
+ override def toString: String = prettyJson
+
+ private[sql] def copy(
+ message: String = this.message,
+ isDataAvailable: Boolean = this.isDataAvailable,
+ isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = {
+ new StreamingQueryStatus(
+ message = message,
+ isDataAvailable = isDataAvailable,
+ isTriggerActive = isTriggerActive)
+ }
+
+ private[sql] def jsonValue: JValue = {
+ ("message" -> JString(message.toString)) ~
+ ("isDataAvailable" -> JBool(isDataAvailable)) ~
+ ("isTriggerActive" -> JBool(isTriggerActive))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 7129fa4..4c82474 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -23,7 +23,6 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import org.apache.jute.compiler.JLong
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
@@ -85,10 +84,10 @@ class StreamingQueryProgress private[sql](
/** The aggregate (across all sources) rate at which Spark is processing data. */
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
- /** The compact JSON representation of this status. */
+ /** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
- /** The pretty (i.e. indented) JSON representation of this status. */
+ /** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
@@ -179,10 +178,10 @@ class SourceProgress protected[sql](
class SinkProgress protected[sql](
val description: String) {
- /** The compact JSON representation of this status. */
+ /** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
- /** The pretty (i.e. indented) JSON representation of this status. */
+ /** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index c68f953..08b93e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -106,6 +106,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception.nonEmpty)
+ // Make sure that the exception message reported through listener
+ // contains the actual exception and relevant stack trace
+ assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+ assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
listener.checkAsyncErrors()
true
}
@@ -159,28 +164,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- testQuietly("exception should be reported in QueryTerminated") {
- val listener = new EventCollector
- withListenerAdded(listener) {
- val input = MemoryStream[Int]
- testStream(input.toDS.map(_ / 0))(
- StartStream(),
- AddData(input, 1),
- ExpectFailure[SparkException](),
- Assert {
- spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- assert(listener.terminationEvent !== null)
- assert(listener.terminationEvent.exception.nonEmpty)
- // Make sure that the exception message reported through listener
- // contains the actual exception and relevant stack trace
- assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
- assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
- assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
- }
- )
- }
- }
-
test("QueryStartedEvent serialization") {
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
val json = JsonProtocol.sparkEventToJson(queryStarted)
@@ -190,7 +173,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgressEvent serialization") {
val event = new StreamingQueryListener.QueryProgressEvent(
- StreamingQueryProgressSuite.testProgress)
+ StreamingQueryStatusAndProgressSuite.testProgress)
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
deleted file mode 100644
index 45d29f6..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.util.UUID
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
-
-
-class StreamingQueryProgressSuite extends SparkFunSuite {
-
- test("prettyJson") {
- val json = testProgress.prettyJson
- assert(json ===
- s"""
- |{
- | "id" : "${testProgress.id.toString}",
- | "name" : "name",
- | "timestamp" : 1,
- | "numInputRows" : 678,
- | "inputRowsPerSecond" : 10.0,
- | "durationMs" : {
- | "total" : 0
- | },
- | "currentWatermark" : 3,
- | "stateOperators" : [ {
- | "numRowsTotal" : 0,
- | "numRowsUpdated" : 1
- | } ],
- | "sources" : [ {
- | "description" : "source",
- | "startOffset" : 123,
- | "endOffset" : 456,
- | "numInputRows" : 678,
- | "inputRowsPerSecond" : 10.0
- | } ],
- | "sink" : {
- | "description" : "sink"
- | }
- |}
- """.stripMargin.trim)
- assert(compact(parse(json)) === testProgress.json)
-
- }
-
- test("json") {
- assert(compact(parse(testProgress.json)) === testProgress.json)
- }
-
- test("toString") {
- assert(testProgress.toString === testProgress.prettyJson)
- }
-}
-
-object StreamingQueryProgressSuite {
- val testProgress = new StreamingQueryProgress(
- id = UUID.randomUUID(),
- name = "name",
- timestamp = 1L,
- batchId = 2L,
- durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
- currentWatermark = 3L,
- stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
- sources = Array(
- new SourceProgress(
- description = "source",
- startOffset = "123",
- endOffset = "456",
- numInputRows = 678,
- inputRowsPerSecond = 10.0,
- processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
- )
- ),
- sink = new SinkProgress("sink")
- )
-}
-
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
new file mode 100644
index 0000000..4da712f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
+
+
+class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
+
+ test("StreamingQueryProgress - prettyJson") {
+ val json = testProgress.prettyJson
+ assert(json ===
+ s"""
+ |{
+ | "id" : "${testProgress.id.toString}",
+ | "name" : "name",
+ | "timestamp" : 1,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0,
+ | "durationMs" : {
+ | "total" : 0
+ | },
+ | "currentWatermark" : 3,
+ | "stateOperators" : [ {
+ | "numRowsTotal" : 0,
+ | "numRowsUpdated" : 1
+ | } ],
+ | "sources" : [ {
+ | "description" : "source",
+ | "startOffset" : 123,
+ | "endOffset" : 456,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0
+ | } ],
+ | "sink" : {
+ | "description" : "sink"
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json)) === testProgress.json)
+
+ }
+
+ test("StreamingQueryProgress - json") {
+ assert(compact(parse(testProgress.json)) === testProgress.json)
+ }
+
+ test("StreamingQueryProgress - toString") {
+ assert(testProgress.toString === testProgress.prettyJson)
+ }
+
+ test("StreamingQueryStatus - prettyJson") {
+ val json = testStatus.prettyJson
+ assert(json ===
+ """
+ |{
+ | "message" : "active",
+ | "isDataAvailable" : true,
+ | "isTriggerActive" : false
+ |}
+ """.stripMargin.trim)
+ }
+
+ test("StreamingQueryStatus - json") {
+ assert(compact(parse(testStatus.json)) === testStatus.json)
+ }
+
+ test("StreamingQueryStatus - toString") {
+ assert(testStatus.toString === testStatus.prettyJson)
+ }
+}
+
+object StreamingQueryStatusAndProgressSuite {
+ val testProgress = new StreamingQueryProgress(
+ id = UUID.randomUUID(),
+ name = "name",
+ timestamp = 1L,
+ batchId = 2L,
+ durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+ currentWatermark = 3L,
+ stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+ sources = Array(
+ new SourceProgress(
+ description = "source",
+ startOffset = "123",
+ endOffset = "456",
+ numInputRows = 678,
+ inputRowsPerSecond = 10.0,
+ processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
+ )
+ ),
+ sink = new SinkProgress("sink")
+ )
+
+ val testStatus = new StreamingQueryStatus("active", true, false)
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4f3b4a2..56abe12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -77,7 +77,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
q2.stop()
}
- testQuietly("lifecycle states and awaitTermination") {
+ testQuietly("isActive, exception, and awaitTermination") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
@@ -110,7 +110,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
- testQuietly("query statuses and progresses") {
+ testQuietly("status, lastProgress, and recentProgresses") {
import StreamingQuerySuite._
clock = new StreamManualClock
@@ -133,10 +133,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
}
// This is to make sure thatquery waits for manual clock to be 600 first time there is data
- val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+ val mapped = inputData.toDS().as[Long].map { x =>
clock.waitTillTime(1100)
- x
- }
+ 10 / x
+ }.agg(count("*")).as[Long]
case class AssertStreamExecThreadToWaitForClock()
extends AssertOnQuery(q => {
@@ -151,25 +151,26 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
true
}, "")
+ var lastProgressBeforeStop: StreamingQueryProgress = null
+
testStream(mapped, OutputMode.Complete)(
StartStream(ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
- // TODO: test status.message before trigger has started
- // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while offset is being fetched
+ // Test status and progress while offset is being fetched
AddData(inputData, 1, 2),
AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
- AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+ AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch is being fetched
+ // Test status and progress while batch is being fetched
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === true),
@@ -177,14 +178,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch is being processed
+ // Test status and progress while batch is being processed
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch processing has completed
+ // Test status and progress while batch processing has completed
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
CheckAnswer(2),
@@ -237,12 +238,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
true
},
- // Test status after data is not available for a trigger
+ // Test status and progress after data is not available for a trigger
AdvanceManualClock(100), // allow another trigger
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
- AssertOnQuery(_.status.message === "Waiting for next trigger")
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+
+ // Test status and progress after query stopped
+ AssertOnQuery { query =>
+ lastProgressBeforeStop = query.lastProgress
+ true
+ },
+ StopStream,
+ AssertOnQuery(_.lastProgress.json === lastProgressBeforeStop.json),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Stopped"),
+
+ // Test status and progress after query terminated with error
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AddData(inputData, 0),
+ AdvanceManualClock(100),
+ ExpectFailure[SparkException],
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message.startsWith("Terminated with exception"))
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org