You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/13 22:14:47 UTC
spark git commit: [SPARK-18834][SS] Expose event time stats through
StreamingQueryProgress
Repository: spark
Updated Branches:
refs/heads/master aebf44e50 -> c68fb426d
[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress
## What changes were proposed in this pull request?
- Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings.
- Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started.
## How was this patch tested?
Updated tests
Author: Tathagata Das <ta...@gmail.com>
Closes #16258 from tdas/SPARK-18834.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c68fb426
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c68fb426
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c68fb426
Branch: refs/heads/master
Commit: c68fb426d4ac05414fb402aa1f30f4c98df103ad
Parents: aebf44e
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Dec 13 14:14:25 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Dec 13 14:14:25 2016 -0800
----------------------------------------------------------------------
.../streaming/EventTimeWatermarkExec.scala | 55 ++++++++++++++------
.../execution/streaming/ProgressReporter.scala | 38 ++++++++++----
.../execution/streaming/StreamExecution.scala | 33 ++++++------
.../apache/spark/sql/streaming/progress.scala | 31 +++++++----
.../streaming/StreamingQueryListenerSuite.scala | 3 ++
.../StreamingQueryStatusAndProgressSuite.scala | 16 ++++--
.../sql/streaming/StreamingQuerySuite.scala | 2 +
.../spark/sql/streaming/WatermarkSuite.scala | 49 +++++++++++++----
8 files changed, 161 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 4c8cb06..e8570d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.streaming
-import scala.math.max
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
@@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.AccumulatorV2
-/** Tracks the maximum positive long seen. */
-class MaxLong(protected var currentValue: Long = 0)
- extends AccumulatorV2[Long, Long] {
+/** Class for collecting event time stats with an accumulator */
+case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) {
+ def add(eventTime: Long): Unit = {
+ this.max = math.max(this.max, eventTime)
+ this.min = math.min(this.min, eventTime)
+ this.sum += eventTime
+ this.count += 1
+ }
+
+ def merge(that: EventTimeStats): Unit = {
+ this.max = math.max(this.max, that.max)
+ this.min = math.min(this.min, that.min)
+ this.sum += that.sum
+ this.count += that.count
+ }
+
+ def avg: Long = sum / count
+}
+
+object EventTimeStats {
+ def zero: EventTimeStats = EventTimeStats(
+ max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L)
+}
+
+/** Accumulator that collects stats on event time in a batch. */
+class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero)
+ extends AccumulatorV2[Long, EventTimeStats] {
- override def isZero: Boolean = value == 0
- override def value: Long = currentValue
- override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
+ override def isZero: Boolean = value == EventTimeStats.zero
+ override def value: EventTimeStats = currentStats
+ override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats)
override def reset(): Unit = {
- currentValue = 0
+ currentStats = EventTimeStats.zero
}
override def add(v: Long): Unit = {
- currentValue = max(v, value)
+ currentStats.add(v)
}
- override def merge(other: AccumulatorV2[Long, Long]): Unit = {
- currentValue = max(value, other.value)
+ override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = {
+ currentStats.merge(other.value)
}
}
@@ -54,22 +76,21 @@ class MaxLong(protected var currentValue: Long = 0)
* adding appropriate metadata to this column, this operator also tracks the maximum observed event
* time. Based on the maximum observed time and a user specified delay, we can calculate the
* `watermark` after which we assume we will no longer see late records for a particular time
- * period.
+ * period. Note that event time is measured in milliseconds.
*/
case class EventTimeWatermarkExec(
eventTime: Attribute,
delay: CalendarInterval,
child: SparkPlan) extends SparkPlan {
- // TODO: Use Spark SQL Metrics?
- val maxEventTime = new MaxLong
- sparkContext.register(maxEventTime)
+ val eventTimeStats = new EventTimeStatsAccum()
+ sparkContext.register(eventTimeStats)
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
iter.map { row =>
- maxEventTime.add(getEventTime(row).getLong(0))
+ eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
row
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 40e3151..549b936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -41,7 +41,9 @@ import org.apache.spark.util.Clock
trait ProgressReporter extends Logging {
case class ExecutionStats(
- inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
+ inputRows: Map[Source, Long],
+ stateOperators: Seq[StateOperatorProgress],
+ eventTimeStats: Map[String, String])
// Internal state of the stream, required for computing metrics.
protected def id: UUID
@@ -127,12 +129,7 @@ trait ProgressReporter extends Logging {
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
- val executionStats: ExecutionStats = if (!hasNewData) {
- ExecutionStats(Map.empty, Seq.empty)
- } else {
- extractExecutionStats
- }
-
+ val executionStats = extractExecutionStats(hasNewData)
val processingTimeSec =
(currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
@@ -160,10 +157,10 @@ trait ProgressReporter extends Logging {
id = id,
runId = runId,
name = name,
- timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
+ timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
- currentWatermark = offsetSeqMetadata.batchWatermarkMs,
+ eventTime = executionStats.eventTimeStats.asJava,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
@@ -184,7 +181,13 @@ trait ProgressReporter extends Logging {
}
/** Extracts statistics from the most recent query execution. */
- private def extractExecutionStats: ExecutionStats = {
+ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+ val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+
+ if (!hasNewData) {
+ return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
+ }
+
// We want to associate execution plan leaves to sources that generate them, so that we match
// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
// Consider the translation from the streaming logical plan to the final executed plan.
@@ -241,7 +244,16 @@ trait ProgressReporter extends Logging {
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}
- ExecutionStats(numInputRows, stateOperators)
+ val eventTimeStats = lastExecution.executedPlan.collect {
+ case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+ val stats = e.eventTimeStats.value
+ Map(
+ "max" -> stats.max,
+ "min" -> stats.min,
+ "avg" -> stats.avg).mapValues(formatTimestamp)
+ }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+ ExecutionStats(numInputRows, stateOperators, eventTimeStats)
}
/** Records the duration of running `body` for the next query progress update. */
@@ -257,6 +269,10 @@ trait ProgressReporter extends Logging {
result
}
+ private def formatTimestamp(millis: Long): String = {
+ timestampFormat.format(new Date(millis))
+ }
+
/** Updates the message returned in `status`. */
protected def updateStatusMessage(message: String): Unit = {
currentStatus = currentStatus.copy(message = message)
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9fe6819..8f97d95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -382,6 +382,24 @@ class StreamExecution(
if (hasNewData) {
// Current batch timestamp in milliseconds
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+ // Update the eventTime watermark if we find one in the plan.
+ if (lastExecution != null) {
+ lastExecution.executedPlan.collect {
+ case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+ logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
+ e.eventTimeStats.value.max - e.delay.milliseconds
+ }.headOption.foreach { newWatermarkMs =>
+ if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+ logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+ offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+ } else {
+ logDebug(
+ s"Event time didn't move: $newWatermarkMs < " +
+ s"${offsetSeqMetadata.batchWatermarkMs}")
+ }
+ }
+ }
+
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
@@ -485,21 +503,6 @@ class StreamExecution(
sink.addBatch(currentBatchId, nextBatch)
}
- // Update the eventTime watermark if we find one in the plan.
- lastExecution.executedPlan.collect {
- case e: EventTimeWatermarkExec =>
- logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
- (e.maxEventTime.value / 1000) - e.delay.milliseconds()
- }.headOption.foreach { newWatermark =>
- if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
- logInfo(s"Updating eventTime watermark to: $newWatermark ms")
- offsetSeqMetadata.batchWatermarkMs = newWatermark
- } else {
- logTrace(s"Event time didn't move: $newWatermark < " +
- s"$offsetSeqMetadata.currentEventTimeWatermark")
- }
- }
-
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index d156875..e219cfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import java.{util => ju}
+import java.lang.{Long => JLong}
import java.util.UUID
import scala.collection.JavaConverters._
@@ -29,7 +30,6 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
/**
* :: Experimental ::
@@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
- * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
* @param batchId A unique id for the current batch of data being processed. Note that in the
* case of retries after a failure a given batchId my be executed more than once.
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
- * @param currentWatermark The current event time watermark in milliseconds
+ * @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
+ * {
+ * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
+ * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
+ * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
+ * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
+ * }
+ * All timestamps are in ISO8601 format, i.e. UTC timestamps.
* @param stateOperators Information about operators in the query that store state.
* @param sources detailed statistics on data being read from each of the streaming sources.
* @since 2.1.0
@@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql](
val name: String,
val timestamp: String,
val batchId: Long,
- val durationMs: ju.Map[String, java.lang.Long],
- val currentWatermark: Long,
+ val durationMs: ju.Map[String, JLong],
+ val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
val sources: Array[SourceProgress],
val sink: SinkProgress) {
@@ -107,6 +114,13 @@ class StreamingQueryProgress private[sql](
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
+ /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
+ def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
+ if (map.isEmpty) return JNothing
+ val keys = map.asScala.keySet.toSeq.sorted
+ keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
+ }
+
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
@@ -114,11 +128,8 @@ class StreamingQueryProgress private[sql](
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
- ("durationMs" -> durationMs
- .asScala
- .map { case (k, v) => k -> JInt(v.toLong): JObject }
- .reduce(_ ~ _)) ~
- ("currentWatermark" -> JInt(currentWatermark)) ~
+ ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
+ ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue)
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index f75f5b5..7c6745ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgressEvent serialization") {
def testSerialization(event: QueryProgressEvent): Unit = {
+ import scala.collection.JavaConverters._
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
+ assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
+ assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
}
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 193c943..c970743 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
- | "currentWatermark" : 3,
+ | "eventTime" : {
+ | "avg" : "2016-12-05T20:54:20.827Z",
+ | "max" : "2016-12-05T20:54:20.827Z",
+ | "min" : "2016-12-05T20:54:20.827Z",
+ | "watermark" : "2016-12-05T20:54:20.827Z"
+ | },
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
@@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
- | "currentWatermark" : 3,
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
@@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
- currentWatermark = 3L,
+ eventTime = Map(
+ "max" -> "2016-12-05T20:54:20.827Z",
+ "min" -> "2016-12-05T20:54:20.827Z",
+ "avg" -> "2016-12-05T20:54:20.827Z",
+ "watermark" -> "2016-12-05T20:54:20.827Z").asJava,
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
@@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
- currentWatermark = 3L,
+ eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c66d6b1..afd788c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import scala.collection.JavaConverters._
+
import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
http://git-wip-us.apache.org/repos/asf/spark/blob/c68fb426/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 12f3c3e..f1cc19c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.streaming
+import java.{util => ju}
+import java.text.SimpleDateFormat
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
@@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
}
- test("watermark metric") {
-
+ test("event time and watermark metrics") {
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
@@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+ def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
+ body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+ true
+ }
+
testStream(windowedAggregation)(
AddData(inputData, 15),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 5000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(15))
+ assert(e.get("min") === formatTimestamp(15))
+ assert(e.get("avg") === formatTimestamp(15))
+ assert(e.get("watermark") === formatTimestamp(0))
},
- AddData(inputData, 15),
+ AddData(inputData, 10, 12, 14),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 5000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(14))
+ assert(e.get("min") === formatTimestamp(10))
+ assert(e.get("avg") === formatTimestamp(12))
+ assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 15000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(5))
+ },
+ AddData(inputData, 25),
+ CheckAnswer((10, 3)),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(15))
}
)
}
@@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
CheckAnswer((10, 1))
)
}
+
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
+
+ private def formatTimestamp(sec: Long): String = {
+ timestampFormat.format(new ju.Date(sec * 1000))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org