You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/09/16 04:10:14 UTC
spark git commit: [SPARK-22017] Take minimum of all watermark execs
in StreamExecution.
Repository: spark
Updated Branches:
refs/heads/master c7307acda -> 0bad10d3e
[SPARK-22017] Take minimum of all watermark execs in StreamExecution.
## What changes were proposed in this pull request?
Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.
## How was this patch tested?
new unit test
Author: Jose Torres <jo...@databricks.com>
Closes #19239 from joseph-torres/SPARK-22017.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bad10d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bad10d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bad10d3
Branch: refs/heads/master
Commit: 0bad10d3e36d3238c7ee7c0fc5465072734b3ae4
Parents: c7307ac
Author: Jose Torres <jo...@databricks.com>
Authored: Fri Sep 15 21:10:07 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Sep 15 21:10:07 2017 -0700
----------------------------------------------------------------------
.../streaming/IncrementalExecution.scala | 2 +-
.../execution/streaming/StreamExecution.scala | 39 ++++++++--
.../sql/streaming/EventTimeWatermarkSuite.scala | 78 ++++++++++++++++++++
3 files changed, 113 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 258a642..19d9598 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -39,7 +39,7 @@ class IncrementalExecution(
val checkpointLocation: String,
val runId: UUID,
val currentBatchId: Long,
- offsetSeqMetadata: OffsetSeqMetadata)
+ val offsetSeqMetadata: OffsetSeqMetadata)
extends QueryExecution(sparkSession, logicalPlan) with Logging {
// Modified planner with stateful operations.
http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 952e431..b27a59b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -130,6 +130,16 @@ class StreamExecution(
protected var offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
+ /**
+ * A map of current watermarks, keyed by the position of the watermark operator in the
+ * physical plan.
+ *
+ * This state is 'soft state', which does not affect the correctness and semantics of watermarks
+ * and is not persisted across query restarts.
+ * The fault-tolerant watermark state is in offsetSeqMetadata.
+ */
+ protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap()
+
override val id: UUID = UUID.fromString(streamMetadata.id)
override val runId: UUID = UUID.randomUUID
@@ -560,13 +570,32 @@ class StreamExecution(
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
- // Update the eventTime watermark if we find one in the plan.
+ // Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
- case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
- logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
- e.eventTimeStats.value.max - e.delayMs
- }.headOption.foreach { newWatermarkMs =>
+ case e: EventTimeWatermarkExec => e
+ }.zipWithIndex.foreach {
+ case (e, index) if e.eventTimeStats.value.count > 0 =>
+ logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
+ val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+ val prevWatermarkMs = watermarkMsMap.get(index)
+ if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
+ watermarkMsMap.put(index, newWatermarkMs)
+ }
+
+ // Populate 0 if we haven't seen any data yet for this watermark node.
+ case (_, index) =>
+ if (!watermarkMsMap.isDefinedAt(index)) {
+ watermarkMsMap.put(index, 0)
+ }
+ }
+
+ // Update the global watermark to the minimum of all watermark nodes.
+ // This is the safest option, because only the global watermark is fault-tolerant. Making
+ // it the minimum of all individual watermarks guarantees it will never advance past where
+ // any individual watermark operator would be if it were in a plan by itself.
+ if(!watermarkMsMap.isEmpty) {
+ val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
batchWatermarkMs = newWatermarkMs
http://git-wip-us.apache.org/repos/asf/spark/blob/0bad10d3/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4f19fa0..f3e8cf9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
)
}
+ test("watermark with 2 streams") {
+ import org.apache.spark.sql.functions.sum
+ val first = MemoryStream[Int]
+
+ val firstDf = first.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .select('value)
+
+ val second = MemoryStream[Int]
+
+ val secondDf = second.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "5 seconds")
+ .select('value)
+
+ withTempDir { checkpointDir =>
+ val unionWriter = firstDf.union(secondDf).agg(sum('value))
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("memory")
+ .outputMode("complete")
+ .queryName("test")
+
+ val union = unionWriter.start()
+
+ def getWatermarkAfterData(
+ firstData: Seq[Int] = Seq.empty,
+ secondData: Seq[Int] = Seq.empty,
+ query: StreamingQuery = union): Long = {
+ if (firstData.nonEmpty) first.addData(firstData)
+ if (secondData.nonEmpty) second.addData(secondData)
+ query.processAllAvailable()
+ // add a dummy batch so lastExecution has the new watermark
+ first.addData(0)
+ query.processAllAvailable()
+ // get last watermark
+ val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+ lastExecution.offsetSeqMetadata.batchWatermarkMs
+ }
+
+ // Global watermark starts at 0 until we get data from both sides
+ assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
+ assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
+ // Global watermark stays at left watermark 1 when right watermark moves to 2
+ assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
+ // Global watermark switches to right side value 2 when left watermark goes higher
+ assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
+ // Global watermark goes back to left
+ assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
+ // Global watermark stays on left as long as it's below right
+ assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
+ assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
+ // Global watermark switches back to right again
+ assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
+
+ // Global watermark is updated correctly with simultaneous data from both sides
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000)
+ assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000)
+ assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000)
+
+ // Global watermark doesn't decrement with simultaneous data
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000)
+ assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000)
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000)
+
+ // Global watermark recovers after restart, but left side watermark ahead of it does not.
+ assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000)
+ union.stop()
+ val union2 = unionWriter.start()
+ assert(getWatermarkAfterData(query = union2) == 185000)
+ // Even though the left side was ahead of 185000 in the last execution, the watermark won't
+ // increment until it gets past it in this execution.
+ assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000)
+ assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000)
+ }
+ }
+
test("complete mode") {
val inputData = MemoryStream[Int]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org