You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/21 18:44:22 UTC

spark git commit: [SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years

Repository: spark
Updated Branches:
  refs/heads/master 1a6438897 -> 607a1e63d


[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years

## What changes were proposed in this pull request?

Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query

## How was this patch tested?
Updated and new unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #16304 from tdas/SPARK-18834-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607a1e63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607a1e63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607a1e63

Branch: refs/heads/master
Commit: 607a1e63dbc9269b806a9f537e1d041029333cdd
Parents: 1a64388
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 21 10:44:20 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 21 10:44:20 2016 -0800

----------------------------------------------------------------------
 .../streaming/EventTimeWatermarkExec.scala      |   7 +-
 .../execution/streaming/ProgressReporter.scala  |   7 +-
 .../execution/streaming/StreamExecution.scala   |   2 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala | 287 +++++++++++++++++++
 .../spark/sql/streaming/WatermarkSuite.scala    | 240 ----------------
 5 files changed, 299 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d0..5a9a99e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
     child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
+  val delayMs = {
+    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    delay.milliseconds + delay.months * millisPerMonth
+  }
+
   sparkContext.register(eventTimeStats)
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
     if (a semanticEquals eventTime) {
       val updatedMetadata = new MetadataBuilder()
           .withMetadata(a.metadata)
-          .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+          .putLong(EventTimeWatermark.delayKey, delayMs)
           .build()
 
       a.withMetadata(updatedMetadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33..c5e9eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
 
   /** Extracts statistics from the most recent query execution. */
   private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
-    val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+    val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
+    val watermarkTimestamp =
+      if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+      else Map.empty[String, String]
 
     if (!hasNewData) {
       return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8f97d95..e05200d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -387,7 +387,7 @@ class StreamExecution(
         lastExecution.executedPlan.collect {
           case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
             logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
-            e.eventTimeStats.value.max - e.delay.milliseconds
+            e.eventTimeStats.value.max - e.delayMs
         }.headOption.foreach { newWatermarkMs =>
           if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
new file mode 100644
index 0000000..bdfba95
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.InternalOutputModes.Complete
+
+class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("error on bad column") {
+    val inputData = MemoryStream[Int].toDF()
+    val e = intercept[AnalysisException] {
+      inputData.withWatermark("badColumn", "1 minute")
+    }
+    assert(e.getMessage contains "badColumn")
+  }
+
+  test("error on wrong type") {
+    val inputData = MemoryStream[Int].toDF()
+    val e = intercept[AnalysisException] {
+      inputData.withWatermark("value", "1 minute")
+    }
+    assert(e.getMessage contains "value")
+    assert(e.getMessage contains "int")
+  }
+
+  test("event time and watermark metrics") {
+    // No event time metrics when there is no watermarking
+    val inputData1 = MemoryStream[Int]
+    val aggWithoutWatermark = inputData1.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(aggWithoutWatermark, outputMode = Complete)(
+      AddData(inputData1, 15),
+      CheckAnswer((15, 1)),
+      assertEventStats { e => assert(e.isEmpty) },
+      AddData(inputData1, 10, 12, 14),
+      CheckAnswer((10, 3), (15, 1)),
+      assertEventStats { e => assert(e.isEmpty) }
+    )
+
+    // All event time metrics where watermarking is set
+    val inputData2 = MemoryStream[Int]
+    val aggWithWatermark = inputData2.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(aggWithWatermark)(
+      AddData(inputData2, 15),
+      CheckAnswer(),
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(15))
+        assert(e.get("min") === formatTimestamp(15))
+        assert(e.get("avg") === formatTimestamp(15))
+        assert(e.get("watermark") === formatTimestamp(0))
+      },
+      AddData(inputData2, 10, 12, 14),
+      CheckAnswer(),
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(14))
+        assert(e.get("min") === formatTimestamp(10))
+        assert(e.get("avg") === formatTimestamp(12))
+        assert(e.get("watermark") === formatTimestamp(5))
+      },
+      AddData(inputData2, 25),
+      CheckAnswer(),
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(25))
+        assert(e.get("min") === formatTimestamp(25))
+        assert(e.get("avg") === formatTimestamp(25))
+        assert(e.get("watermark") === formatTimestamp(5))
+      },
+      AddData(inputData2, 25),
+      CheckAnswer((10, 3)),
+      assertEventStats { e =>
+        assert(e.get("max") === formatTimestamp(25))
+        assert(e.get("min") === formatTimestamp(25))
+        assert(e.get("avg") === formatTimestamp(25))
+        assert(e.get("watermark") === formatTimestamp(15))
+      }
+    )
+  }
+
+  test("append-mode watermark aggregation") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      CheckAnswer((10, 5))
+    )
+  }
+
+  test("delay in months and years handled correctly") {
+    val currentTimeMs = System.currentTimeMillis
+    val currentTime = new Date(currentTimeMs)
+
+    val input = MemoryStream[Long]
+    val aggWithWatermark = input.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "2 years 5 months")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
+
+    testStream(aggWithWatermark)(
+      AddData(input, currentTimeMs / 1000),
+      CheckAnswer(),
+      AddData(input, currentTimeMs / 1000),
+      CheckAnswer(),
+      assertEventStats { e =>
+        assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
+        val watermarkTime = timestampFormat.parse(e.get("watermark"))
+        assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
+      }
+    )
+  }
+
+  test("recovery") {
+    val inputData = MemoryStream[Int]
+    val df = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(df)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckLastBatch(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      StopStream,
+      StartStream(),
+      CheckLastBatch(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      CheckLastBatch((10, 5)),
+      StopStream,
+      AssertOnQuery { q => // clear the sink
+        q.sink.asInstanceOf[MemorySink].clear()
+        true
+      },
+      StartStream(),
+      CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10
+      AddData(inputData, 30), // Advance watermark to 20 seconds
+      CheckLastBatch(),
+      StopStream,
+      StartStream(), // Watermark should still be 15 seconds
+      AddData(inputData, 17),
+      CheckLastBatch(), // We still do not see next batch
+      AddData(inputData, 30), // Advance watermark to 20 seconds
+      CheckLastBatch(),
+      AddData(inputData, 30), // Evict items less than previous watermark.
+      CheckLastBatch((15, 2)) // Ensure we see next window
+    )
+  }
+
+  test("dropping old data") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12),
+      CheckAnswer(),
+      AddData(inputData, 25),     // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25),     // Evict items less than previous watermark.
+      CheckAnswer((10, 3)),
+      AddData(inputData, 10),     // 10 is later than 15 second watermark
+      CheckAnswer((10, 3)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3))        // Should not emit an incorrect partial result.
+    )
+  }
+
+  test("complete mode") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    // No eviction when asked to compute complete results.
+    testStream(windowedAggregation, OutputMode.Complete)(
+      AddData(inputData, 10, 11, 12),
+      CheckAnswer((10, 3)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3), (25, 1)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3), (25, 2)),
+      AddData(inputData, 10),
+      CheckAnswer((10, 4), (25, 2)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 4), (25, 3))
+    )
+  }
+
+  test("group by on raw timestamp") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy($"eventTime")
+        .agg(count("*") as 'count)
+        .select($"eventTime".cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10),
+      CheckAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      CheckAnswer((10, 1))
+    )
+  }
+
+  private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
+    AssertOnQuery { q =>
+      body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+      true
+    }
+  }
+
+  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
+
+  private def formatTimestamp(sec: Long): String = {
+    timestampFormat.format(new ju.Date(sec * 1000))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/607a1e63/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
deleted file mode 100644
index f1cc19c..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.{util => ju}
-import java.text.SimpleDateFormat
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions.{count, window}
-
-class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
-
-  import testImplicits._
-
-  after {
-    sqlContext.streams.active.foreach(_.stop())
-  }
-
-  test("error on bad column") {
-    val inputData = MemoryStream[Int].toDF()
-    val e = intercept[AnalysisException] {
-      inputData.withWatermark("badColumn", "1 minute")
-    }
-    assert(e.getMessage contains "badColumn")
-  }
-
-  test("error on wrong type") {
-    val inputData = MemoryStream[Int].toDF()
-    val e = intercept[AnalysisException] {
-      inputData.withWatermark("value", "1 minute")
-    }
-    assert(e.getMessage contains "value")
-    assert(e.getMessage contains "int")
-  }
-
-
-  test("event time and watermark metrics") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy(window($"eventTime", "5 seconds") as 'window)
-        .agg(count("*") as 'count)
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
-      body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
-      true
-    }
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 15),
-      CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(15))
-        assert(e.get("min") === formatTimestamp(15))
-        assert(e.get("avg") === formatTimestamp(15))
-        assert(e.get("watermark") === formatTimestamp(0))
-      },
-      AddData(inputData, 10, 12, 14),
-      CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(14))
-        assert(e.get("min") === formatTimestamp(10))
-        assert(e.get("avg") === formatTimestamp(12))
-        assert(e.get("watermark") === formatTimestamp(5))
-      },
-      AddData(inputData, 25),
-      CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(25))
-        assert(e.get("min") === formatTimestamp(25))
-        assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(5))
-      },
-      AddData(inputData, 25),
-      CheckAnswer((10, 3)),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(25))
-        assert(e.get("min") === formatTimestamp(25))
-        assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(15))
-      }
-    )
-  }
-
-  test("append-mode watermark aggregation") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .groupBy(window($"eventTime", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckAnswer(),
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckAnswer((10, 5))
-    )
-  }
-
-  test("recovery") {
-    val inputData = MemoryStream[Int]
-    val df = inputData.toDF()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .groupBy(window($"eventTime", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    testStream(df)(
-      AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch(),
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      StopStream,
-      StartStream(),
-      CheckLastBatch(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckLastBatch((10, 5)),
-      StopStream,
-      AssertOnQuery { q => // clear the sink
-        q.sink.asInstanceOf[MemorySink].clear()
-        true
-      },
-      StartStream(),
-      CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
-      StopStream,
-      StartStream(), // Watermark should still be 15 seconds
-      AddData(inputData, 17),
-      CheckLastBatch(), // We still do not see next batch
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
-      AddData(inputData, 30), // Evict items less than previous watermark.
-      CheckLastBatch((15, 2)) // Ensure we see next window
-    )
-  }
-
-  test("dropping old data") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy(window($"eventTime", "5 seconds") as 'window)
-        .agg(count("*") as 'count)
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 10, 11, 12),
-      CheckAnswer(),
-      AddData(inputData, 25),     // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25),     // Evict items less than previous watermark.
-      CheckAnswer((10, 3)),
-      AddData(inputData, 10),     // 10 is later than 15 second watermark
-      CheckAnswer((10, 3)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 3))        // Should not emit an incorrect partial result.
-    )
-  }
-
-  test("complete mode") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy(window($"eventTime", "5 seconds") as 'window)
-        .agg(count("*") as 'count)
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    // No eviction when asked to compute complete results.
-    testStream(windowedAggregation, OutputMode.Complete)(
-      AddData(inputData, 10, 11, 12),
-      CheckAnswer((10, 3)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 3), (25, 1)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 3), (25, 2)),
-      AddData(inputData, 10),
-      CheckAnswer((10, 4), (25, 2)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 4), (25, 3))
-    )
-  }
-
-  test("group by on raw timestamp") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy($"eventTime")
-        .agg(count("*") as 'count)
-        .select($"eventTime".cast("long").as[Long], $"count".as[Long])
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 10),
-      CheckAnswer(),
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckAnswer((10, 1))
-    )
-  }
-
-  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
-
-  private def formatTimestamp(sec: Long): String = {
-    timestampFormat.format(new ju.Date(sec * 1000))
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org