You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/09/16 04:10:14 UTC

spark git commit: [SPARK-22017] Take minimum of all watermark execs in StreamExecution.

Repository: spark
Updated Branches:
  refs/heads/master c7307acda -> 0bad10d3e


[SPARK-22017] Take minimum of all watermark execs in StreamExecution.

## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test

Author: Jose Torres <jo...@databricks.com>

Closes #19239 from joseph-torres/SPARK-22017.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bad10d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bad10d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bad10d3

Branch: refs/heads/master
Commit: 0bad10d3e36d3238c7ee7c0fc5465072734b3ae4
Parents: c7307ac
Author: Jose Torres <jo...@databricks.com>
Authored: Fri Sep 15 21:10:07 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Sep 15 21:10:07 2017 -0700

----------------------------------------------------------------------
 .../streaming/IncrementalExecution.scala        |  2 +-
 .../execution/streaming/StreamExecution.scala   | 39 ++++++++--
 .../sql/streaming/EventTimeWatermarkSuite.scala | 78 ++++++++++++++++++++
 3 files changed, 113 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 258a642..19d9598 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -39,7 +39,7 @@ class IncrementalExecution(
     val checkpointLocation: String,
     val runId: UUID,
     val currentBatchId: Long,
-    offsetSeqMetadata: OffsetSeqMetadata)
+    val offsetSeqMetadata: OffsetSeqMetadata)
   extends QueryExecution(sparkSession, logicalPlan) with Logging {
 
   // Modified planner with stateful operations.

http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 952e431..b27a59b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -130,6 +130,16 @@ class StreamExecution(
   protected var offsetSeqMetadata = OffsetSeqMetadata(
     batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
 
+  /**
+   * A map of current watermarks, keyed by the position of the watermark operator in the
+   * physical plan.
+   *
+   * This state is 'soft state', which does not affect the correctness and semantics of watermarks
+   * and is not persisted across query restarts.
+   * The fault-tolerant watermark state is in offsetSeqMetadata.
+   */
+  protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap()
+
   override val id: UUID = UUID.fromString(streamMetadata.id)
 
   override val runId: UUID = UUID.randomUUID
@@ -560,13 +570,32 @@ class StreamExecution(
     }
     if (hasNewData) {
       var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-      // Update the eventTime watermark if we find one in the plan.
+      // Update the eventTime watermarks if we find any in the plan.
       if (lastExecution != null) {
         lastExecution.executedPlan.collect {
-          case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
-            logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
-            e.eventTimeStats.value.max - e.delayMs
-        }.headOption.foreach { newWatermarkMs =>
+          case e: EventTimeWatermarkExec => e
+        }.zipWithIndex.foreach {
+          case (e, index) if e.eventTimeStats.value.count > 0 =>
+            logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
+            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+            val prevWatermarkMs = watermarkMsMap.get(index)
+            if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
+              watermarkMsMap.put(index, newWatermarkMs)
+            }
+
+          // Populate 0 if we haven't seen any data yet for this watermark node.
+          case (_, index) =>
+            if (!watermarkMsMap.isDefinedAt(index)) {
+              watermarkMsMap.put(index, 0)
+            }
+        }
+
+        // Update the global watermark to the minimum of all watermark nodes.
+        // This is the safest option, because only the global watermark is fault-tolerant. Making
+        // it the minimum of all individual watermarks guarantees it will never advance past where
+        // any individual watermark operator would be if it were in a plan by itself.
+        if(!watermarkMsMap.isEmpty) {
+          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
           if (newWatermarkMs > batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
             batchWatermarkMs = newWatermarkMs

http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4f19fa0..f3e8cf9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     )
   }
 
+  test("watermark with 2 streams") {
+    import org.apache.spark.sql.functions.sum
+    val first = MemoryStream[Int]
+
+    val firstDf = first.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .select('value)
+
+    val second = MemoryStream[Int]
+
+    val secondDf = second.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "5 seconds")
+      .select('value)
+
+    withTempDir { checkpointDir =>
+      val unionWriter = firstDf.union(secondDf).agg(sum('value))
+        .writeStream
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .format("memory")
+        .outputMode("complete")
+        .queryName("test")
+
+      val union = unionWriter.start()
+
+      def getWatermarkAfterData(
+                                 firstData: Seq[Int] = Seq.empty,
+                                 secondData: Seq[Int] = Seq.empty,
+                                 query: StreamingQuery = union): Long = {
+        if (firstData.nonEmpty) first.addData(firstData)
+        if (secondData.nonEmpty) second.addData(secondData)
+        query.processAllAvailable()
+        // add a dummy batch so lastExecution has the new watermark
+        first.addData(0)
+        query.processAllAvailable()
+        // get last watermark
+        val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+        lastExecution.offsetSeqMetadata.batchWatermarkMs
+      }
+
+      // Global watermark starts at 0 until we get data from both sides
+      assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
+      assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
+      // Global watermark stays at left watermark 1 when right watermark moves to 2
+      assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
+      // Global watermark switches to right side value 2 when left watermark goes higher
+      assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
+      // Global watermark goes back to left
+      assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
+      // Global watermark stays on left as long as it's below right
+      assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
+      assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
+      // Global watermark switches back to right again
+      assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
+
+      // Global watermark is updated correctly with simultaneous data from both sides
+      assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000)
+      assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000)
+      assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000)
+
+      // Global watermark doesn't decrement with simultaneous data
+      assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000)
+      assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000)
+      assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000)
+
+      // Global watermark recovers after restart, but left side watermark ahead of it does not.
+      assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000)
+      union.stop()
+      val union2 = unionWriter.start()
+      assert(getWatermarkAfterData(query = union2) == 185000)
+      // Even though the left side was ahead of 185000 in the last execution, the watermark won't
+      // increment until it gets past it in this execution.
+      assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000)
+      assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000)
+    }
+  }
+
   test("complete mode") {
     val inputData = MemoryStream[Int]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org