You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/07 01:04:29 UTC

spark git commit: [SPARK-18734][SS] Represent timestamp in StreamingQueryProgress as formatted string instead of millis

Repository: spark
Updated Branches:
  refs/heads/master 4cc8d8906 -> 539bb3cf9


[SPARK-18734][SS] Represent timestamp in StreamingQueryProgress as formatted string instead of millis

## What changes were proposed in this pull request?

Easier to read while debugging as a formatted string (in ISO8601 format) than in millis

## How was this patch tested?
Updated unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #16166 from tdas/SPARK-18734.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/539bb3cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/539bb3cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/539bb3cf

Branch: refs/heads/master
Commit: 539bb3cf9573be5cd86e7e6502523ce89c0de170
Parents: 4cc8d89
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Dec 6 17:04:26 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Dec 6 17:04:26 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/ProgressReporter.scala     | 8 ++++++--
 .../main/scala/org/apache/spark/sql/streaming/progress.scala | 6 +++---
 .../sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 8 ++++----
 .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +-
 4 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d95f552..12d0c1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.util.UUID
+import java.text.SimpleDateFormat
+import java.util.{Date, TimeZone, UUID}
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -78,6 +79,9 @@ trait ProgressReporter extends Logging {
   // The timestamp we report an event that has no input data
   private var lastNoDataProgressEventTime = Long.MinValue
 
+  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+
   @volatile
   protected var currentStatus: StreamingQueryStatus = {
     new StreamingQueryStatus(
@@ -156,7 +160,7 @@ trait ProgressReporter extends Logging {
       id = id,
       runId = runId,
       name = name,
-      timestamp = currentTriggerStartTimestamp,
+      timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
       batchId = currentBatchId,
       durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
       currentWatermark = offsetSeqMetadata.batchWatermarkMs,

http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index f768080..d156875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -29,6 +29,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 /**
  * :: Experimental ::
@@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql](
   val id: UUID,
   val runId: UUID,
   val name: String,
-  val timestamp: Long,
+  val timestamp: String,
   val batchId: Long,
   val durationMs: ju.Map[String, java.lang.Long],
   val currentWatermark: Long,
@@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql](
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
-    ("timestamp" -> JInt(timestamp)) ~
+    ("timestamp" -> JString(timestamp)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
     ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
     ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
@@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql](
     ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
     ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
     ("sink" -> sink.jsonValue)
-
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 96f19db..193c943 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
         |  "id" : "${testProgress1.id.toString}",
         |  "runId" : "${testProgress1.runId.toString}",
         |  "name" : "myName",
-        |  "timestamp" : 1,
+        |  "timestamp" : "2016-12-05T20:54:20.827Z",
         |  "numInputRows" : 678,
         |  "inputRowsPerSecond" : 10.0,
         |  "durationMs" : {
@@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
          |  "id" : "${testProgress2.id.toString}",
          |  "runId" : "${testProgress2.runId.toString}",
          |  "name" : null,
-         |  "timestamp" : 1,
+         |  "timestamp" : "2016-12-05T20:54:20.827Z",
          |  "numInputRows" : 678,
          |  "durationMs" : {
          |    "total" : 0
@@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite {
     id = UUID.randomUUID,
     runId = UUID.randomUUID,
     name = "myName",
-    timestamp = 1L,
+    timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
     currentWatermark = 3L,
@@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite {
     id = UUID.randomUUID,
     runId = UUID.randomUUID,
     name = null, // should not be present in the json
-    timestamp = 1L,
+    timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
     currentWatermark = 3L,

http://git-wip-us.apache.org/repos/asf/spark/blob/539bb3cf/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 893cb76..55dd1a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -243,7 +243,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
         assert(progress.id === query.id)
         assert(progress.name === query.name)
         assert(progress.batchId === 0)
-        assert(progress.timestamp === 100)
+        assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
         assert(progress.numInputRows === 2)
         assert(progress.processedRowsPerSecond === 2.0)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org