You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/08 04:35:00 UTC
spark git commit: [SPARK-19859][SS] The new watermark should override
the old one
Repository: spark
Updated Branches:
refs/heads/master ca849ac4e -> d8830c503
[SPARK-19859][SS] The new watermark should override the old one
## What changes were proposed in this pull request?
The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected.
## How was this patch tested?
The new test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #17199 from zsxwing/SPARK-19859.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8830c50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8830c50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8830c50
Branch: refs/heads/master
Commit: d8830c5039d9c7c5ef03631904c32873ab558e22
Parents: ca849ac
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Mar 7 20:34:55 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Mar 7 20:34:55 2017 -0800
----------------------------------------------------------------------
.../catalyst/plans/logical/EventTimeWatermark.scala | 7 +++++++
.../spark/sql/streaming/EventTimeWatermarkSuite.scala | 14 ++++++++++++++
2 files changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d8830c50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 77309ce..62f68a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -42,6 +42,13 @@ case class EventTimeWatermark(
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
.build()
a.withMetadata(updatedMetadata)
+ } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+ // Remove existing watermark
+ val updatedMetadata = new MetadataBuilder()
+ .withMetadata(a.metadata)
+ .remove(EventTimeWatermark.delayKey)
+ .build()
+ a.withMetadata(updatedMetadata)
} else {
a
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d8830c50/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c34d119..c768525 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.OutputMode._
@@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
+ test("the new watermark should override the old one") {
+ val df = MemoryStream[(Long, Long)].toDF()
+ .withColumn("first", $"_1".cast("timestamp"))
+ .withColumn("second", $"_2".cast("timestamp"))
+ .withWatermark("first", "1 minute")
+ .withWatermark("second", "2 minutes")
+
+ val eventTimeColumns = df.logicalPlan.output
+ .filter(_.metadata.contains(EventTimeWatermark.delayKey))
+ assert(eventTimeColumns.size === 1)
+ assert(eventTimeColumns(0).name === "second")
+ }
+
private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org