You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/08/02 21:02:16 UTC
spark git commit: [SPARK-21546][SS] dropDuplicates should ignore
watermark when it's not a key
Repository: spark
Updated Branches:
refs/heads/master 9456176da -> 0d26b3aa5
[SPARK-21546][SS] dropDuplicates should ignore watermark when it's not a key
## What changes were proposed in this pull request?
When the watermark is not a column of `dropDuplicates`, right now it will crash. This PR fixed this issue.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #18822 from zsxwing/SPARK-21546.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d26b3aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d26b3aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d26b3aa
Branch: refs/heads/master
Commit: 0d26b3aa55f9cc75096b0e2b309f64fe3270b9a5
Parents: 9456176
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Aug 2 14:02:13 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Aug 2 14:02:13 2017 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/statefulOperators.scala | 9 +++++++--
.../apache/spark/sql/streaming/DeduplicateSuite.scala | 13 +++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 6addab6..e463563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -156,8 +156,13 @@ trait WatermarkSupport extends UnaryExecNode {
}
/** Predicate based on keys that matches data older than the watermark */
- lazy val watermarkPredicateForKeys: Option[Predicate] =
- watermarkExpression.map(newPredicate(_, keyExpressions))
+ lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
+ if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
+ Some(newPredicate(e, keyExpressions))
+ } else {
+ None
+ }
+ }
/** Predicate based on the child output that matches data older than the watermark. */
lazy val watermarkPredicateForData: Option[Predicate] =
http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
index a15c2cf..e858b7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
@@ -268,4 +268,17 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
CheckLastBatch(7)
)
}
+
+ test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
+ val input = MemoryStream[(Int, Int)]
+ val df = input.toDS.toDF("id", "time")
+ .withColumn("time", $"time".cast("timestamp"))
+ .withWatermark("time", "1 second")
+ .dropDuplicates("id")
+ .select($"id", $"time".cast("long"))
+ testStream(df)(
+ AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
+ CheckLastBatch(1 -> 1, 2 -> 2)
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org