You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/21 18:44:22 UTC
spark git commit: [SPARK-18894][SS] Fix event time watermark delay
threshold specified in months or years
Repository: spark
Updated Branches:
refs/heads/master 1a6438897 -> 607a1e63d
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request?
Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query
## How was this patch tested?
Updated and new unit tests
Author: Tathagata Das <ta...@gmail.com>
Closes #16304 from tdas/SPARK-18834-1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607a1e63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607a1e63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607a1e63
Branch: refs/heads/master
Commit: 607a1e63dbc9269b806a9f537e1d041029333cdd
Parents: 1a64388
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 21 10:44:20 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 21 10:44:20 2016 -0800
----------------------------------------------------------------------
.../streaming/EventTimeWatermarkExec.scala | 7 +-
.../execution/streaming/ProgressReporter.scala | 7 +-
.../execution/streaming/StreamExecution.scala | 2 +-
.../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++++++++++++++++++
.../spark/sql/streaming/WatermarkSuite.scala | 240 ----------------
5 files changed, 299 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d0..5a9a99e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
child: SparkPlan) extends SparkPlan {
val eventTimeStats = new EventTimeStatsAccum()
+ val delayMs = {
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ delay.milliseconds + delay.months * millisPerMonth
+ }
+
sparkContext.register(eventTimeStats)
override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33..c5e9eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
- val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
+ val watermarkTimestamp =
+ if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ else Map.empty[String, String]
if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8f97d95..e05200d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -387,7 +387,7 @@ class StreamExecution(
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
- e.eventTimeStats.value.max - e.delay.milliseconds
+ e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
new file mode 100644
index 0000000..bdfba95
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.InternalOutputModes.Complete
+
+class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+
+ import testImplicits._
+
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("error on bad column") {
+ val inputData = MemoryStream[Int].toDF()
+ val e = intercept[AnalysisException] {
+ inputData.withWatermark("badColumn", "1 minute")
+ }
+ assert(e.getMessage contains "badColumn")
+ }
+
+ test("error on wrong type") {
+ val inputData = MemoryStream[Int].toDF()
+ val e = intercept[AnalysisException] {
+ inputData.withWatermark("value", "1 minute")
+ }
+ assert(e.getMessage contains "value")
+ assert(e.getMessage contains "int")
+ }
+
+ test("event time and watermark metrics") {
+ // No event time metrics when there is no watermarking
+ val inputData1 = MemoryStream[Int]
+ val aggWithoutWatermark = inputData1.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(aggWithoutWatermark, outputMode = Complete)(
+ AddData(inputData1, 15),
+ CheckAnswer((15, 1)),
+ assertEventStats { e => assert(e.isEmpty) },
+ AddData(inputData1, 10, 12, 14),
+ CheckAnswer((10, 3), (15, 1)),
+ assertEventStats { e => assert(e.isEmpty) }
+ )
+
+ // All event time metrics where watermarking is set
+ val inputData2 = MemoryStream[Int]
+ val aggWithWatermark = inputData2.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(aggWithWatermark)(
+ AddData(inputData2, 15),
+ CheckAnswer(),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(15))
+ assert(e.get("min") === formatTimestamp(15))
+ assert(e.get("avg") === formatTimestamp(15))
+ assert(e.get("watermark") === formatTimestamp(0))
+ },
+ AddData(inputData2, 10, 12, 14),
+ CheckAnswer(),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(14))
+ assert(e.get("min") === formatTimestamp(10))
+ assert(e.get("avg") === formatTimestamp(12))
+ assert(e.get("watermark") === formatTimestamp(5))
+ },
+ AddData(inputData2, 25),
+ CheckAnswer(),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(5))
+ },
+ AddData(inputData2, 25),
+ CheckAnswer((10, 3)),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(15))
+ }
+ )
+ }
+
+ test("append-mode watermark aggregation") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 5))
+ )
+ }
+
+ test("delay in months and years handled correctly") {
+ val currentTimeMs = System.currentTimeMillis
+ val currentTime = new Date(currentTimeMs)
+
+ val input = MemoryStream[Long]
+ val aggWithWatermark = input.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "2 years 5 months")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
+
+ testStream(aggWithWatermark)(
+ AddData(input, currentTimeMs / 1000),
+ CheckAnswer(),
+ AddData(input, currentTimeMs / 1000),
+ CheckAnswer(),
+ assertEventStats { e =>
+ assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
+ val watermarkTime = timestampFormat.parse(e.get("watermark"))
+ assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
+ }
+ )
+ }
+
+ test("recovery") {
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(df)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckLastBatch(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ StopStream,
+ StartStream(),
+ CheckLastBatch(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckLastBatch((10, 5)),
+ StopStream,
+ AssertOnQuery { q => // clear the sink
+ q.sink.asInstanceOf[MemorySink].clear()
+ true
+ },
+ StartStream(),
+ CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10
+ AddData(inputData, 30), // Advance watermark to 20 seconds
+ CheckLastBatch(),
+ StopStream,
+ StartStream(), // Watermark should still be 15 seconds
+ AddData(inputData, 17),
+ CheckLastBatch(), // We still do not see next batch
+ AddData(inputData, 30), // Advance watermark to 20 seconds
+ CheckLastBatch(),
+ AddData(inputData, 30), // Evict items less than previous watermark.
+ CheckLastBatch((15, 2)) // Ensure we see next window
+ )
+ }
+
+ test("dropping old data") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 3)),
+ AddData(inputData, 10), // 10 is later than 15 second watermark
+ CheckAnswer((10, 3)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3)) // Should not emit an incorrect partial result.
+ )
+ }
+
+ test("complete mode") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ // No eviction when asked to compute complete results.
+ testStream(windowedAggregation, OutputMode.Complete)(
+ AddData(inputData, 10, 11, 12),
+ CheckAnswer((10, 3)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3), (25, 1)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3), (25, 2)),
+ AddData(inputData, 10),
+ CheckAnswer((10, 4), (25, 2)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 4), (25, 3))
+ )
+ }
+
+ test("group by on raw timestamp") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy($"eventTime")
+ .agg(count("*") as 'count)
+ .select($"eventTime".cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 1))
+ )
+ }
+
+ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
+ AssertOnQuery { q =>
+ body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+ true
+ }
+ }
+
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
+
+ private def formatTimestamp(sec: Long): String = {
+ timestampFormat.format(new ju.Date(sec * 1000))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
deleted file mode 100644
index f1cc19c..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.{util => ju}
-import java.text.SimpleDateFormat
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions.{count, window}
-
-class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
-
- import testImplicits._
-
- after {
- sqlContext.streams.active.foreach(_.stop())
- }
-
- test("error on bad column") {
- val inputData = MemoryStream[Int].toDF()
- val e = intercept[AnalysisException] {
- inputData.withWatermark("badColumn", "1 minute")
- }
- assert(e.getMessage contains "badColumn")
- }
-
- test("error on wrong type") {
- val inputData = MemoryStream[Int].toDF()
- val e = intercept[AnalysisException] {
- inputData.withWatermark("value", "1 minute")
- }
- assert(e.getMessage contains "value")
- assert(e.getMessage contains "int")
- }
-
-
- test("event time and watermark metrics") {
- val inputData = MemoryStream[Int]
-
- val windowedAggregation = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy(window($"eventTime", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
- def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
- body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
- true
- }
-
- testStream(windowedAggregation)(
- AddData(inputData, 15),
- CheckAnswer(),
- assertEventStats { e =>
- assert(e.get("max") === formatTimestamp(15))
- assert(e.get("min") === formatTimestamp(15))
- assert(e.get("avg") === formatTimestamp(15))
- assert(e.get("watermark") === formatTimestamp(0))
- },
- AddData(inputData, 10, 12, 14),
- CheckAnswer(),
- assertEventStats { e =>
- assert(e.get("max") === formatTimestamp(14))
- assert(e.get("min") === formatTimestamp(10))
- assert(e.get("avg") === formatTimestamp(12))
- assert(e.get("watermark") === formatTimestamp(5))
- },
- AddData(inputData, 25),
- CheckAnswer(),
- assertEventStats { e =>
- assert(e.get("max") === formatTimestamp(25))
- assert(e.get("min") === formatTimestamp(25))
- assert(e.get("avg") === formatTimestamp(25))
- assert(e.get("watermark") === formatTimestamp(5))
- },
- AddData(inputData, 25),
- CheckAnswer((10, 3)),
- assertEventStats { e =>
- assert(e.get("max") === formatTimestamp(25))
- assert(e.get("min") === formatTimestamp(25))
- assert(e.get("avg") === formatTimestamp(25))
- assert(e.get("watermark") === formatTimestamp(15))
- }
- )
- }
-
- test("append-mode watermark aggregation") {
- val inputData = MemoryStream[Int]
-
- val windowedAggregation = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy(window($"eventTime", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
- testStream(windowedAggregation)(
- AddData(inputData, 10, 11, 12, 13, 14, 15),
- CheckAnswer(),
- AddData(inputData, 25), // Advance watermark to 15 seconds
- CheckAnswer(),
- AddData(inputData, 25), // Evict items less than previous watermark.
- CheckAnswer((10, 5))
- )
- }
-
- test("recovery") {
- val inputData = MemoryStream[Int]
- val df = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy(window($"eventTime", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
- testStream(df)(
- AddData(inputData, 10, 11, 12, 13, 14, 15),
- CheckLastBatch(),
- AddData(inputData, 25), // Advance watermark to 15 seconds
- StopStream,
- StartStream(),
- CheckLastBatch(),
- AddData(inputData, 25), // Evict items less than previous watermark.
- CheckLastBatch((10, 5)),
- StopStream,
- AssertOnQuery { q => // clear the sink
- q.sink.asInstanceOf[MemorySink].clear()
- true
- },
- StartStream(),
- CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10
- AddData(inputData, 30), // Advance watermark to 20 seconds
- CheckLastBatch(),
- StopStream,
- StartStream(), // Watermark should still be 15 seconds
- AddData(inputData, 17),
- CheckLastBatch(), // We still do not see next batch
- AddData(inputData, 30), // Advance watermark to 20 seconds
- CheckLastBatch(),
- AddData(inputData, 30), // Evict items less than previous watermark.
- CheckLastBatch((15, 2)) // Ensure we see next window
- )
- }
-
- test("dropping old data") {
- val inputData = MemoryStream[Int]
-
- val windowedAggregation = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy(window($"eventTime", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
- testStream(windowedAggregation)(
- AddData(inputData, 10, 11, 12),
- CheckAnswer(),
- AddData(inputData, 25), // Advance watermark to 15 seconds
- CheckAnswer(),
- AddData(inputData, 25), // Evict items less than previous watermark.
- CheckAnswer((10, 3)),
- AddData(inputData, 10), // 10 is later than 15 second watermark
- CheckAnswer((10, 3)),
- AddData(inputData, 25),
- CheckAnswer((10, 3)) // Should not emit an incorrect partial result.
- )
- }
-
- test("complete mode") {
- val inputData = MemoryStream[Int]
-
- val windowedAggregation = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy(window($"eventTime", "5 seconds") as 'window)
- .agg(count("*") as 'count)
- .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
- // No eviction when asked to compute complete results.
- testStream(windowedAggregation, OutputMode.Complete)(
- AddData(inputData, 10, 11, 12),
- CheckAnswer((10, 3)),
- AddData(inputData, 25),
- CheckAnswer((10, 3), (25, 1)),
- AddData(inputData, 25),
- CheckAnswer((10, 3), (25, 2)),
- AddData(inputData, 10),
- CheckAnswer((10, 4), (25, 2)),
- AddData(inputData, 25),
- CheckAnswer((10, 4), (25, 3))
- )
- }
-
- test("group by on raw timestamp") {
- val inputData = MemoryStream[Int]
-
- val windowedAggregation = inputData.toDF()
- .withColumn("eventTime", $"value".cast("timestamp"))
- .withWatermark("eventTime", "10 seconds")
- .groupBy($"eventTime")
- .agg(count("*") as 'count)
- .select($"eventTime".cast("long").as[Long], $"count".as[Long])
-
- testStream(windowedAggregation)(
- AddData(inputData, 10),
- CheckAnswer(),
- AddData(inputData, 25), // Advance watermark to 15 seconds
- CheckAnswer(),
- AddData(inputData, 25), // Evict items less than previous watermark.
- CheckAnswer((10, 1))
- )
- }
-
- private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
- timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
-
- private def formatTimestamp(sec: Long): String = {
- timestampFormat.format(new ju.Date(sec * 1000))
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org