You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/16 18:01:01 UTC

spark git commit: [SPARK-18459][SPARK-18460][STRUCTUREDSTREAMING] Rename triggerId to batchId and add triggerDetails to json in StreamingQueryStatus

Repository: spark
Updated Branches:
  refs/heads/master 608ecc512 -> 0048ce7ce


[SPARK-18459][SPARK-18460][STRUCTUREDSTREAMING] Rename triggerId to batchId and add triggerDetails to json in StreamingQueryStatus

## What changes were proposed in this pull request?

SPARK-18459: triggerId seems like a number that should be increasing with each trigger, whether or not there is data in it. However, actually, triggerId increases only where there is a batch of data in a trigger. So its better to rename it to batchId.

SPARK-18460: triggerDetails was missing from json representation. Fixed it.

## How was this patch tested?
Updated existing unit tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #15895 from tdas/SPARK-18459.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0048ce7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0048ce7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0048ce7c

Branch: refs/heads/master
Commit: 0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e
Parents: 608ecc5
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Nov 16 10:00:59 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Nov 16 10:00:59 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py                 |  6 +++---
 .../sql/execution/streaming/StreamMetrics.scala |  8 +++----
 .../sql/streaming/StreamingQueryStatus.scala    |  4 ++--
 .../streaming/StreamMetricsSuite.scala          |  8 +++----
 .../streaming/StreamingQueryListenerSuite.scala |  4 ++--
 .../streaming/StreamingQueryStatusSuite.scala   | 22 ++++++++++++++++++--
 6 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index f326f16..0e4589b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -212,12 +212,12 @@ class StreamingQueryStatus(object):
             Processing rate 23.5 rows/sec
             Latency: 345.0 ms
             Trigger details:
+                batchId: 5
                 isDataPresentInTrigger: true
                 isTriggerActive: true
                 latency.getBatch.total: 20
                 latency.getOffset.total: 10
                 numRows.input.total: 100
-                triggerId: 5
             Source statuses [1 source]:
                 Source 1 - MySource1
                     Available offset: 0
@@ -341,8 +341,8 @@ class StreamingQueryStatus(object):
         If no trigger is currently active, then it will have details of the last completed trigger.
 
         >>> sqs.triggerDetails
-        {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
-        u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+        {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
+        u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
         u'isDataPresentInTrigger': u'true'}
         """
         return self._jsqs.triggerDetails()

http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
index 5645554..942e6ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
 
   // =========== Setter methods ===========
 
-  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+  def reportTriggerStarted(batchId: Long): Unit = synchronized {
     numInputRows.clear()
     triggerDetails.clear()
     sourceTriggerDetails.values.foreach(_.clear())
 
-    reportTriggerDetail(TRIGGER_ID, triggerId)
-    sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
+    reportTriggerDetail(BATCH_ID, batchId)
+    sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
     reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
     currentTriggerStartTimestamp = triggerClock.getTimeMillis()
     reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
@@ -217,7 +217,7 @@ object StreamMetrics extends Logging {
   }
 
 
-  val TRIGGER_ID = "triggerId"
+  val BATCH_ID = "batchId"
   val IS_TRIGGER_ACTIVE = "isTriggerActive"
   val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
   val STATUS_MESSAGE = "statusMessage"

http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 99c7729..ba732ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -102,7 +102,7 @@ class StreamingQueryStatus private(
     ("inputRate" -> JDouble(inputRate)) ~
     ("processingRate" -> JDouble(processingRate)) ~
     ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
-    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
     ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
     ("sinkStatus" -> sinkStatus.jsonValue)
   }
@@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus {
         desc = "MySink",
         offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
       triggerDetails = Map(
-        TRIGGER_ID -> "5",
+        BATCH_ID -> "5",
         IS_TRIGGER_ACTIVE -> "true",
         IS_DATA_PRESENT_IN_TRIGGER -> "true",
         GET_OFFSET_LATENCY -> "10",

http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
index 938423d..38c4ece 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
@@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite {
     assert(sm.currentSourceProcessingRate(source) === 0.0)
     assert(sm.currentLatency() === None)
     assert(sm.currentTriggerDetails() ===
-      Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
+      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
         START_TIMESTAMP -> "0", "key" -> "value"))
     assert(sm.currentSourceTriggerDetails(source) ===
-      Map(TRIGGER_ID -> "1", "key2" -> "value2"))
+      Map(BATCH_ID -> "1", "key2" -> "value2"))
 
     // Finishing the trigger should calculate the rates, except input rate which needs
     // to have another trigger interval
@@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite {
     assert(sm.currentSourceProcessingRate(source) === 100.0)
     assert(sm.currentLatency() === None)
     assert(sm.currentTriggerDetails() ===
-      Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
+      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
         START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
         NUM_INPUT_ROWS -> "100", "key" -> "value"))
     assert(sm.currentSourceTriggerDetails(source) ===
-      Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
+      Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
 
     // After another trigger starts, the rates and latencies should not change until
     // new rows are reported

http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index cebb32a..98f3bec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
       AssertOnLastQueryStatus { status: StreamingQueryStatus =>
         // Check the correctness of the trigger info of the last completed batch reported by
         // onQueryProgress
-        assert(status.triggerDetails.containsKey("triggerId"))
+        assert(status.triggerDetails.containsKey("batchId"))
         assert(status.triggerDetails.get("isTriggerActive") === "false")
         assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
 
@@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
 
         assert(status.sourceStatuses.length === 1)
-        assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
+        assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
         assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
         assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
         assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")

http://git-wip-us.apache.org/repos/asf/spark/blob/0048ce7c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
index 6af19fb..50a7d92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
         |    Processing rate 23.5 rows/sec
         |    Latency: 345.0 ms
         |    Trigger details:
+        |        batchId: 5
         |        isDataPresentInTrigger: true
         |        isTriggerActive: true
         |        latency.getBatch.total: 20
         |        latency.getOffset.total: 10
         |        numRows.input.total: 100
-        |        triggerId: 5
         |    Source statuses [1 source]:
         |        Source 1 - MySource1
         |            Available offset: 0
@@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
   test("json") {
     assert(StreamingQueryStatus.testStatus.json ===
       """
-        |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
+        |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
+        |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
+        |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
+        |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
+        |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
         |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
         |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
         |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
@@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
       StreamingQueryStatus.testStatus.prettyJson ===
         """
           |{
+          |  "name" : "query",
+          |  "id" : 1,
+          |  "timestamp" : 123,
+          |  "inputRate" : 15.5,
+          |  "processingRate" : 23.5,
+          |  "latency" : 345.0,
+          |  "triggerDetails" : {
+          |    "latency.getBatch.total" : "20",
+          |    "numRows.input.total" : "100",
+          |    "isTriggerActive" : "true",
+          |    "batchId" : "5",
+          |    "latency.getOffset.total" : "10",
+          |    "isDataPresentInTrigger" : "true"
+          |  },
           |  "sourceStatuses" : [ {
           |    "description" : "MySource1",
           |    "offsetDesc" : "0",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org