You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/30 07:09:01 UTC

spark git commit: [SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python

Repository: spark
Updated Branches:
  refs/heads/master 4c82ca86d -> bc09a2b8c


[SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python

## What changes were proposed in this pull request?
- Add StreamingQueryStatus.json
- Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress)
- Add StreamingQuery.status to Python
- Fix post-termination status

## How was this patch tested?
New unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #16075 from tdas/SPARK-18516-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc09a2b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc09a2b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc09a2b8

Branch: refs/heads/master
Commit: bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c
Parents: 4c82ca8
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Nov 29 23:08:56 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Nov 29 23:08:56 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py                 |   8 ++
 python/pyspark/sql/tests.py                     |   5 +
 .../execution/streaming/ProgressReporter.scala  |   5 +-
 .../execution/streaming/StreamExecution.scala   |   4 +
 .../sql/streaming/StreamingQueryStatus.scala    |  38 +++++-
 .../apache/spark/sql/streaming/progress.scala   |   9 +-
 .../streaming/StreamingQueryListenerSuite.scala |  29 +----
 .../streaming/StreamingQueryProgressSuite.scala |  98 ---------------
 .../StreamingQueryStatusAndProgressSuite.scala  | 120 +++++++++++++++++++
 .../sql/streaming/StreamingQuerySuite.scala     |  49 +++++---
 10 files changed, 219 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index c420b0d..84f01d3 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -90,6 +90,14 @@ class StreamingQuery(object):
 
     @property
     @since(2.1)
+    def status(self):
+        """
+        Returns the current status of the query.
+        """
+        return json.loads(self._jsq.status().json())
+
+    @property
+    @since(2.1)
     def recentProgresses(self):
         """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
         The number of progress updates retained for each stream is configured by Spark session

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7151f95..b7b2a59 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1097,9 +1097,14 @@ class SQLTests(ReusedPySparkTestCase):
             q.processAllAvailable()
             lastProgress = q.lastProgress
             recentProgresses = q.recentProgresses
+            status = q.status
             self.assertEqual(lastProgress['name'], q.name)
             self.assertEqual(lastProgress['id'], q.id)
             self.assertTrue(any(p == lastProgress for p in recentProgresses))
+            self.assertTrue(
+                "message" in status and
+                "isDataAvailable" in status and
+                "isTriggerActive" in status)
         finally:
             q.stop()
             shutil.rmtree(tmpPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index b7b6e19..ba77e7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -70,11 +70,12 @@ trait ProgressReporter extends Logging {
   private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
 
   @volatile
-  protected var currentStatus: StreamingQueryStatus =
-    StreamingQueryStatus(
+  protected var currentStatus: StreamingQueryStatus = {
+    new StreamingQueryStatus(
       message = "Initializing StreamExecution",
       isDataAvailable = false,
       isTriggerActive = false)
+  }
 
   /** Returns the current status of the query. */
   def status: StreamingQueryStatus = currentStatus

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e4f31af..6d0e269 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -238,8 +238,10 @@ class StreamExecution(
         updateStatusMessage("Waiting for next trigger")
         isTerminated
       })
+      updateStatusMessage("Stopped")
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by stop()
+        updateStatusMessage("Stopped")
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
           this,
@@ -247,6 +249,7 @@ class StreamExecution(
           e,
           Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
         logError(s"Query $name terminated with error", e)
+        updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
         // handle them
         if (!NonFatal(e)) {
@@ -254,6 +257,7 @@ class StreamExecution(
         }
     } finally {
       state = TERMINATED
+      currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
 
       // Update metrics and status
       sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4c1a7ce..44befa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 /**
  * Reports information about the instantaneous status of a streaming query.
  *
@@ -27,7 +32,32 @@ package org.apache.spark.sql.streaming
  *
  * @since 2.1.0
  */
-case class StreamingQueryStatus protected[sql](
-    message: String,
-    isDataAvailable: Boolean,
-    isTriggerActive: Boolean)
+class StreamingQueryStatus protected[sql](
+    val message: String,
+    val isDataAvailable: Boolean,
+    val isTriggerActive: Boolean) {
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def copy(
+      message: String = this.message,
+      isDataAvailable: Boolean = this.isDataAvailable,
+      isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = {
+    new StreamingQueryStatus(
+      message = message,
+      isDataAvailable = isDataAvailable,
+      isTriggerActive = isTriggerActive)
+  }
+
+  private[sql] def jsonValue: JValue = {
+    ("message" -> JString(message.toString)) ~
+    ("isDataAvailable" -> JBool(isDataAvailable)) ~
+    ("isTriggerActive" -> JBool(isTriggerActive))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 7129fa4..4c82474 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -23,7 +23,6 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.jute.compiler.JLong
 import org.json4s._
 import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
@@ -85,10 +84,10 @@ class StreamingQueryProgress private[sql](
   /** The aggregate (across all sources) rate at which Spark is processing data. */
   def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
 
-  /** The compact JSON representation of this status. */
+  /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
 
-  /** The pretty (i.e. indented) JSON representation of this status. */
+  /** The pretty (i.e. indented) JSON representation of this progress. */
   def prettyJson: String = pretty(render(jsonValue))
 
   override def toString: String = prettyJson
@@ -179,10 +178,10 @@ class SourceProgress protected[sql](
 class SinkProgress protected[sql](
     val description: String) {
 
-  /** The compact JSON representation of this status. */
+  /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
 
-  /** The pretty (i.e. indented) JSON representation of this status. */
+  /** The pretty (i.e. indented) JSON representation of this progress. */
   def prettyJson: String = pretty(render(jsonValue))
 
   override def toString: String = prettyJson

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index c68f953..08b93e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -106,6 +106,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
           assert(listener.terminationEvent !== null)
           assert(listener.terminationEvent.id === query.id)
           assert(listener.terminationEvent.exception.nonEmpty)
+          // Make sure that the exception message reported through listener
+          // contains the actual exception and relevant stack trace
+          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
           listener.checkAsyncErrors()
           true
         }
@@ -159,28 +164,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  testQuietly("exception should be reported in QueryTerminated") {
-    val listener = new EventCollector
-    withListenerAdded(listener) {
-      val input = MemoryStream[Int]
-      testStream(input.toDS.map(_ / 0))(
-        StartStream(),
-        AddData(input, 1),
-        ExpectFailure[SparkException](),
-        Assert {
-          spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-          assert(listener.terminationEvent !== null)
-          assert(listener.terminationEvent.exception.nonEmpty)
-          // Make sure that the exception message reported through listener
-          // contains the actual exception and relevant stack trace
-          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
-          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
-        }
-      )
-    }
-  }
-
   test("QueryStartedEvent serialization") {
     val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
     val json = JsonProtocol.sparkEventToJson(queryStarted)
@@ -190,7 +173,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   test("QueryProgressEvent serialization") {
     val event = new StreamingQueryListener.QueryProgressEvent(
-      StreamingQueryProgressSuite.testProgress)
+      StreamingQueryStatusAndProgressSuite.testProgress)
     val json = JsonProtocol.sparkEventToJson(event)
     val newEvent = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryProgressEvent]

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
deleted file mode 100644
index 45d29f6..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.util.UUID
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
-
-
-class StreamingQueryProgressSuite extends SparkFunSuite {
-
-  test("prettyJson") {
-    val json = testProgress.prettyJson
-    assert(json ===
-      s"""
-        |{
-        |  "id" : "${testProgress.id.toString}",
-        |  "name" : "name",
-        |  "timestamp" : 1,
-        |  "numInputRows" : 678,
-        |  "inputRowsPerSecond" : 10.0,
-        |  "durationMs" : {
-        |    "total" : 0
-        |  },
-        |  "currentWatermark" : 3,
-        |  "stateOperators" : [ {
-        |    "numRowsTotal" : 0,
-        |    "numRowsUpdated" : 1
-        |  } ],
-        |  "sources" : [ {
-        |    "description" : "source",
-        |    "startOffset" : 123,
-        |    "endOffset" : 456,
-        |    "numInputRows" : 678,
-        |    "inputRowsPerSecond" : 10.0
-        |  } ],
-        |  "sink" : {
-        |    "description" : "sink"
-        |  }
-        |}
-      """.stripMargin.trim)
-    assert(compact(parse(json)) === testProgress.json)
-
-  }
-
-  test("json") {
-    assert(compact(parse(testProgress.json)) === testProgress.json)
-  }
-
-  test("toString") {
-    assert(testProgress.toString === testProgress.prettyJson)
-  }
-}
-
-object StreamingQueryProgressSuite {
-  val testProgress = new StreamingQueryProgress(
-    id = UUID.randomUUID(),
-    name = "name",
-    timestamp = 1L,
-    batchId = 2L,
-    durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-    currentWatermark = 3L,
-    stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
-    sources = Array(
-      new SourceProgress(
-        description = "source",
-        startOffset = "123",
-        endOffset = "456",
-        numInputRows = 678,
-        inputRowsPerSecond = 10.0,
-        processedRowsPerSecond = Double.PositiveInfinity  // should not be present in the json
-      )
-    ),
-    sink = new SinkProgress("sink")
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
new file mode 100644
index 0000000..4da712f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
+
+
+class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
+
+  test("StreamingQueryProgress - prettyJson") {
+    val json = testProgress.prettyJson
+    assert(json ===
+      s"""
+        |{
+        |  "id" : "${testProgress.id.toString}",
+        |  "name" : "name",
+        |  "timestamp" : 1,
+        |  "numInputRows" : 678,
+        |  "inputRowsPerSecond" : 10.0,
+        |  "durationMs" : {
+        |    "total" : 0
+        |  },
+        |  "currentWatermark" : 3,
+        |  "stateOperators" : [ {
+        |    "numRowsTotal" : 0,
+        |    "numRowsUpdated" : 1
+        |  } ],
+        |  "sources" : [ {
+        |    "description" : "source",
+        |    "startOffset" : 123,
+        |    "endOffset" : 456,
+        |    "numInputRows" : 678,
+        |    "inputRowsPerSecond" : 10.0
+        |  } ],
+        |  "sink" : {
+        |    "description" : "sink"
+        |  }
+        |}
+      """.stripMargin.trim)
+    assert(compact(parse(json)) === testProgress.json)
+
+  }
+
+  test("StreamingQueryProgress - json") {
+    assert(compact(parse(testProgress.json)) === testProgress.json)
+  }
+
+  test("StreamingQueryProgress - toString") {
+    assert(testProgress.toString === testProgress.prettyJson)
+  }
+
+  test("StreamingQueryStatus - prettyJson") {
+    val json = testStatus.prettyJson
+    assert(json ===
+      """
+        |{
+        |  "message" : "active",
+        |  "isDataAvailable" : true,
+        |  "isTriggerActive" : false
+        |}
+      """.stripMargin.trim)
+  }
+
+  test("StreamingQueryStatus - json") {
+    assert(compact(parse(testStatus.json)) === testStatus.json)
+  }
+
+  test("StreamingQueryStatus - toString") {
+    assert(testStatus.toString === testStatus.prettyJson)
+  }
+}
+
+object StreamingQueryStatusAndProgressSuite {
+  val testProgress = new StreamingQueryProgress(
+    id = UUID.randomUUID(),
+    name = "name",
+    timestamp = 1L,
+    batchId = 2L,
+    durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+    currentWatermark = 3L,
+    stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+    sources = Array(
+      new SourceProgress(
+        description = "source",
+        startOffset = "123",
+        endOffset = "456",
+        numInputRows = 678,
+        inputRowsPerSecond = 10.0,
+        processedRowsPerSecond = Double.PositiveInfinity  // should not be present in the json
+      )
+    ),
+    sink = new SinkProgress("sink")
+  )
+
+  val testStatus = new StreamingQueryStatus("active", true, false)
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/bc09a2b8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4f3b4a2..56abe12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -77,7 +77,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     q2.stop()
   }
 
-  testQuietly("lifecycle states and awaitTermination") {
+  testQuietly("isActive, exception, and awaitTermination") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map { 6 / _}
 
@@ -110,7 +110,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
-  testQuietly("query statuses and progresses") {
+  testQuietly("status, lastProgress, and recentProgresses") {
     import StreamingQuerySuite._
     clock = new StreamManualClock
 
@@ -133,10 +133,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     }
 
     // This is to make sure thatquery waits for manual clock to be 600 first time there is data
-    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+    val mapped = inputData.toDS().as[Long].map { x =>
       clock.waitTillTime(1100)
-      x
-    }
+      10 / x
+    }.agg(count("*")).as[Long]
 
     case class AssertStreamExecThreadToWaitForClock()
       extends AssertOnQuery(q => {
@@ -151,25 +151,26 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
         true
       }, "")
 
+    var lastProgressBeforeStop: StreamingQueryProgress = null
+
     testStream(mapped, OutputMode.Complete)(
       StartStream(ProcessingTime(100), triggerClock = clock),
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
-      // TODO: test status.message before trigger has started
-      // AssertOnQuery(_.lastProgress === null)  // there is an empty trigger as soon as started
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while offset is being fetched
+      // Test status and progress while offset is being fetched
       AddData(inputData, 1, 2),
       AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === true),
-      AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+      AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch is being fetched
+      // Test status and progress while batch is being fetched
       AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === true),
@@ -177,14 +178,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch is being processed
+      // Test status and progress while batch is being processed
       AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch processing has completed
+      // Test status and progress while batch processing has completed
       AdvanceManualClock(500), // time = 1100 to unblock job
       AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
       CheckAnswer(2),
@@ -237,12 +238,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
         true
       },
 
-      // Test status after data is not available for a trigger
+      // Test status and progress after data is not available for a trigger
       AdvanceManualClock(100), // allow another trigger
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
-      AssertOnQuery(_.status.message === "Waiting for next trigger")
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+
+      // Test status and progress after query stopped
+      AssertOnQuery { query =>
+        lastProgressBeforeStop = query.lastProgress
+        true
+      },
+      StopStream,
+      AssertOnQuery(_.lastProgress.json === lastProgressBeforeStop.json),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Stopped"),
+
+      // Test status and progress after query terminated with error
+      StartStream(ProcessingTime(100), triggerClock = clock),
+      AddData(inputData, 0),
+      AdvanceManualClock(100),
+      ExpectFailure[SparkException],
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message.startsWith("Terminated with exception"))
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org