You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/13 22:14:47 UTC

spark git commit: [SPARK-18834][SS] Expose event time stats through StreamingQueryProgress

Repository: spark
Updated Branches:
  refs/heads/master aebf44e50 -> c68fb426d


[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress

## What changes were proposed in this pull request?

- Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings.

- Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started.

## How was this patch tested?

Updated tests

Author: Tathagata Das <ta...@gmail.com>

Closes #16258 from tdas/SPARK-18834.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c68fb426
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c68fb426
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c68fb426

Branch: refs/heads/master
Commit: c68fb426d4ac05414fb402aa1f30f4c98df103ad
Parents: aebf44e
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Dec 13 14:14:25 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Dec 13 14:14:25 2016 -0800

----------------------------------------------------------------------
 .../streaming/EventTimeWatermarkExec.scala      | 55 ++++++++++++++------
 .../execution/streaming/ProgressReporter.scala  | 38 ++++++++++----
 .../execution/streaming/StreamExecution.scala   | 33 ++++++------
 .../apache/spark/sql/streaming/progress.scala   | 31 +++++++----
 .../streaming/StreamingQueryListenerSuite.scala |  3 ++
 .../StreamingQueryStatusAndProgressSuite.scala  | 16 ++++--
 .../sql/streaming/StreamingQuerySuite.scala     |  2 +
 .../spark/sql/streaming/WatermarkSuite.scala    | 49 +++++++++++++----
 8 files changed, 161 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 4c8cb06..e8570d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import scala.math.max
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
@@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.AccumulatorV2
 
-/** Tracks the maximum positive long seen. */
-class MaxLong(protected var currentValue: Long = 0)
-  extends AccumulatorV2[Long, Long] {
+/** Class for collecting event time stats with an accumulator */
+case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) {
+  def add(eventTime: Long): Unit = {
+    this.max = math.max(this.max, eventTime)
+    this.min = math.min(this.min, eventTime)
+    this.sum += eventTime
+    this.count += 1
+  }
+
+  def merge(that: EventTimeStats): Unit = {
+    this.max = math.max(this.max, that.max)
+    this.min = math.min(this.min, that.min)
+    this.sum += that.sum
+    this.count += that.count
+  }
+
+  def avg: Long = sum / count
+}
+
+object EventTimeStats {
+  def zero: EventTimeStats = EventTimeStats(
+    max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L)
+}
+
+/** Accumulator that collects stats on event time in a batch. */
+class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero)
+  extends AccumulatorV2[Long, EventTimeStats] {
 
-  override def isZero: Boolean = value == 0
-  override def value: Long = currentValue
-  override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
+  override def isZero: Boolean = value == EventTimeStats.zero
+  override def value: EventTimeStats = currentStats
+  override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats)
 
   override def reset(): Unit = {
-    currentValue = 0
+    currentStats = EventTimeStats.zero
   }
 
   override def add(v: Long): Unit = {
-    currentValue = max(v, value)
+    currentStats.add(v)
   }
 
-  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
-    currentValue = max(value, other.value)
+  override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = {
+    currentStats.merge(other.value)
   }
 }
 
@@ -54,22 +76,21 @@ class MaxLong(protected var currentValue: Long = 0)
  * adding appropriate metadata to this column, this operator also tracks the maximum observed event
  * time. Based on the maximum observed time and a user specified delay, we can calculate the
  * `watermark` after which we assume we will no longer see late records for a particular time
- * period.
+ * period. Note that event time is measured in milliseconds.
  */
 case class EventTimeWatermarkExec(
     eventTime: Attribute,
     delay: CalendarInterval,
     child: SparkPlan) extends SparkPlan {
 
-  // TODO: Use Spark SQL Metrics?
-  val maxEventTime = new MaxLong
-  sparkContext.register(maxEventTime)
+  val eventTimeStats = new EventTimeStatsAccum()
+  sparkContext.register(eventTimeStats)
 
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitions { iter =>
       val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
       iter.map { row =>
-        maxEventTime.add(getEventTime(row).getLong(0))
+        eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
         row
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 40e3151..549b936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -41,7 +41,9 @@ import org.apache.spark.util.Clock
 trait ProgressReporter extends Logging {
 
   case class ExecutionStats(
-    inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
+    inputRows: Map[Source, Long],
+    stateOperators: Seq[StateOperatorProgress],
+    eventTimeStats: Map[String, String])
 
   // Internal state of the stream, required for computing metrics.
   protected def id: UUID
@@ -127,12 +129,7 @@ trait ProgressReporter extends Logging {
   protected def finishTrigger(hasNewData: Boolean): Unit = {
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
-    val executionStats: ExecutionStats = if (!hasNewData) {
-      ExecutionStats(Map.empty, Seq.empty)
-    } else {
-      extractExecutionStats
-    }
-
+    val executionStats = extractExecutionStats(hasNewData)
     val processingTimeSec =
       (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
 
@@ -160,10 +157,10 @@ trait ProgressReporter extends Logging {
       id = id,
       runId = runId,
       name = name,
-      timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
+      timestamp = formatTimestamp(currentTriggerStartTimestamp),
       batchId = currentBatchId,
       durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
-      currentWatermark = offsetSeqMetadata.batchWatermarkMs,
+      eventTime = executionStats.eventTimeStats.asJava,
       stateOperators = executionStats.stateOperators.toArray,
       sources = sourceProgress.toArray,
       sink = sinkProgress)
@@ -184,7 +181,13 @@ trait ProgressReporter extends Logging {
   }
 
   /** Extracts statistics from the most recent query execution. */
-  private def extractExecutionStats: ExecutionStats = {
+  private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+    val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+
+    if (!hasNewData) {
+      return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
+    }
+
     // We want to associate execution plan leaves to sources that generate them, so that we match
     // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
     // Consider the translation from the streaming logical plan to the final executed plan.
@@ -241,7 +244,16 @@ trait ProgressReporter extends Logging {
         numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
     }
 
-    ExecutionStats(numInputRows, stateOperators)
+    val eventTimeStats = lastExecution.executedPlan.collect {
+      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+        val stats = e.eventTimeStats.value
+        Map(
+          "max" -> stats.max,
+          "min" -> stats.min,
+          "avg" -> stats.avg).mapValues(formatTimestamp)
+    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
   }
 
   /** Records the duration of running `body` for the next query progress update. */
@@ -257,6 +269,10 @@ trait ProgressReporter extends Logging {
     result
   }
 
+  private def formatTimestamp(millis: Long): String = {
+    timestampFormat.format(new Date(millis))
+  }
+
   /** Updates the message returned in `status`. */
   protected def updateStatusMessage(message: String): Unit = {
     currentStatus = currentStatus.copy(message = message)

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9fe6819..8f97d95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -382,6 +382,24 @@ class StreamExecution(
     if (hasNewData) {
       // Current batch timestamp in milliseconds
       offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+      // Update the eventTime watermark if we find one in the plan.
+      if (lastExecution != null) {
+        lastExecution.executedPlan.collect {
+          case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+            logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
+            e.eventTimeStats.value.max - e.delay.milliseconds
+        }.headOption.foreach { newWatermarkMs =>
+          if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+            offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+          } else {
+            logDebug(
+              s"Event time didn't move: $newWatermarkMs < " +
+                s"${offsetSeqMetadata.batchWatermarkMs}")
+          }
+        }
+      }
+
       updateStatusMessage("Writing offsets to log")
       reportTimeTaken("walCommit") {
         assert(offsetLog.add(
@@ -485,21 +503,6 @@ class StreamExecution(
       sink.addBatch(currentBatchId, nextBatch)
     }
 
-    // Update the eventTime watermark if we find one in the plan.
-    lastExecution.executedPlan.collect {
-      case e: EventTimeWatermarkExec =>
-        logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
-        (e.maxEventTime.value / 1000) - e.delay.milliseconds()
-    }.headOption.foreach { newWatermark =>
-      if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
-        logInfo(s"Updating eventTime watermark to: $newWatermark ms")
-        offsetSeqMetadata.batchWatermarkMs = newWatermark
-      } else {
-        logTrace(s"Event time didn't move: $newWatermark < " +
-          s"$offsetSeqMetadata.currentEventTimeWatermark")
-      }
-    }
-
     awaitBatchLock.lock()
     try {
       // Wake up any threads that are waiting for the stream to progress.

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index d156875..e219cfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.{util => ju}
+import java.lang.{Long => JLong}
 import java.util.UUID
 
 import scala.collection.JavaConverters._
@@ -29,7 +30,6 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 /**
  * :: Experimental ::
@@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
  * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
  * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
  * @param name User-specified name of the query, null if not specified.
- * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
  * @param batchId A unique id for the current batch of data being processed.  Note that in the
  *                case of retries after a failure a given batchId my be executed more than once.
  *                Similarly, when there is no data to be processed, the batchId will not be
  *                incremented.
  * @param durationMs The amount of time taken to perform various operations in milliseconds.
- * @param currentWatermark The current event time watermark in milliseconds
+ * @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
+ *                 {
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *                 }
+ *                 All timestamps are in ISO8601 format, i.e. UTC timestamps.
  * @param stateOperators Information about operators in the query that store state.
  * @param sources detailed statistics on data being read from each of the streaming sources.
  * @since 2.1.0
@@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql](
   val name: String,
   val timestamp: String,
   val batchId: Long,
-  val durationMs: ju.Map[String, java.lang.Long],
-  val currentWatermark: Long,
+  val durationMs: ju.Map[String, JLong],
+  val eventTime: ju.Map[String, String],
   val stateOperators: Array[StateOperatorProgress],
   val sources: Array[SourceProgress],
   val sink: SinkProgress) {
@@ -107,6 +114,13 @@ class StreamingQueryProgress private[sql](
       if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
     }
 
+    /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
+    def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
+      if (map.isEmpty) return JNothing
+      val keys = map.asScala.keySet.toSeq.sorted
+      keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
+    }
+
     ("id" -> JString(id.toString)) ~
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
@@ -114,11 +128,8 @@ class StreamingQueryProgress private[sql](
     ("numInputRows" -> JInt(numInputRows)) ~
     ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
     ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
-    ("durationMs" -> durationMs
-        .asScala
-        .map { case (k, v) => k -> JInt(v.toLong): JObject }
-        .reduce(_ ~ _)) ~
-    ("currentWatermark" -> JInt(currentWatermark)) ~
+    ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
+    ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
     ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
     ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
     ("sink" -> sink.jsonValue)

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index f75f5b5..7c6745ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   test("QueryProgressEvent serialization") {
     def testSerialization(event: QueryProgressEvent): Unit = {
+      import scala.collection.JavaConverters._
       val json = JsonProtocol.sparkEventToJson(event)
       val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
       assert(newEvent.progress.json === event.progress.json)  // json as a proxy for equality
+      assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
+      assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
     }
     testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
     testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 193c943..c970743 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
         |  "durationMs" : {
         |    "total" : 0
         |  },
-        |  "currentWatermark" : 3,
+        |  "eventTime" : {
+        |    "avg" : "2016-12-05T20:54:20.827Z",
+        |    "max" : "2016-12-05T20:54:20.827Z",
+        |    "min" : "2016-12-05T20:54:20.827Z",
+        |    "watermark" : "2016-12-05T20:54:20.827Z"
+        |  },
         |  "stateOperators" : [ {
         |    "numRowsTotal" : 0,
         |    "numRowsUpdated" : 1
@@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
          |  "durationMs" : {
          |    "total" : 0
          |  },
-         |  "currentWatermark" : 3,
          |  "stateOperators" : [ {
          |    "numRowsTotal" : 0,
          |    "numRowsUpdated" : 1
@@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-    currentWatermark = 3L,
+    eventTime = Map(
+      "max" -> "2016-12-05T20:54:20.827Z",
+      "min" -> "2016-12-05T20:54:20.827Z",
+      "avg" -> "2016-12-05T20:54:20.827Z",
+      "watermark" -> "2016-12-05T20:54:20.827Z").asJava,
     stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
     sources = Array(
       new SourceProgress(
@@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-    currentWatermark = 3L,
+    eventTime = Map.empty[String, String].asJava,  // empty maps should be handled correctly
     stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
     sources = Array(
       new SourceProgress(

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c66d6b1..afd788c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import scala.collection.JavaConverters._
+
 import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._

http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 12f3c3e..f1cc19c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
+import java.{util => ju}
+import java.text.SimpleDateFormat
+
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.internal.Logging
@@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
   }
 
 
-  test("watermark metric") {
-
+  test("event time and watermark metrics") {
     val inputData = MemoryStream[Int]
 
     val windowedAggregation = inputData.toDF()
@@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
         .agg(count("*") as 'count)
         .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
 
+    def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
+      body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+      true
+    }
+
     testStream(windowedAggregation)(
       AddData(inputData, 15),
       CheckAnswer(),
-      AssertOnQuery { query =>
-        query.lastProgress.currentWatermark === 5000
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(15))
+        assert(e.get("min") === formatTimestamp(15))
+        assert(e.get("avg") === formatTimestamp(15))
+        assert(e.get("watermark") === formatTimestamp(0))
       },
-      AddData(inputData, 15),
+      AddData(inputData, 10, 12, 14),
       CheckAnswer(),
-      AssertOnQuery { query =>
-        query.lastProgress.currentWatermark === 5000
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(14))
+        assert(e.get("min") === formatTimestamp(10))
+        assert(e.get("avg") === formatTimestamp(12))
+        assert(e.get("watermark") === formatTimestamp(5))
       },
       AddData(inputData, 25),
       CheckAnswer(),
-      AssertOnQuery { query =>
-        query.lastProgress.currentWatermark === 15000
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(25))
+        assert(e.get("min") === formatTimestamp(25))
+        assert(e.get("avg") === formatTimestamp(25))
+        assert(e.get("watermark") === formatTimestamp(5))
+      },
+      AddData(inputData, 25),
+      CheckAnswer((10, 3)),
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(25))
+        assert(e.get("min") === formatTimestamp(25))
+        assert(e.get("avg") === formatTimestamp(25))
+        assert(e.get("watermark") === formatTimestamp(15))
       }
     )
   }
@@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
       CheckAnswer((10, 1))
     )
   }
+
+  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
+
+  private def formatTimestamp(sec: Long): String = {
+    timestampFormat.format(new ju.Date(sec * 1000))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org