You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/08/02 21:02:16 UTC

spark git commit: [SPARK-21546][SS] dropDuplicates should ignore watermark when it's not a key

Repository: spark
Updated Branches:
  refs/heads/master 9456176da -> 0d26b3aa5


[SPARK-21546][SS] dropDuplicates should ignore watermark when it's not a key

## What changes were proposed in this pull request?

When the watermark is not a column of `dropDuplicates`, right now it will crash. This PR fixed this issue.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #18822 from zsxwing/SPARK-21546.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d26b3aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d26b3aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d26b3aa

Branch: refs/heads/master
Commit: 0d26b3aa55f9cc75096b0e2b309f64fe3270b9a5
Parents: 9456176
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Aug 2 14:02:13 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Aug 2 14:02:13 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/statefulOperators.scala    |  9 +++++++--
 .../apache/spark/sql/streaming/DeduplicateSuite.scala  | 13 +++++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 6addab6..e463563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -156,8 +156,13 @@ trait WatermarkSupport extends UnaryExecNode {
   }
 
   /** Predicate based on keys that matches data older than the watermark */
-  lazy val watermarkPredicateForKeys: Option[Predicate] =
-    watermarkExpression.map(newPredicate(_, keyExpressions))
+  lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
+    if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
+      Some(newPredicate(e, keyExpressions))
+    } else {
+      None
+    }
+  }
 
   /** Predicate based on the child output that matches data older than the watermark. */
   lazy val watermarkPredicateForData: Option[Predicate] =

http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
index a15c2cf..e858b7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
@@ -268,4 +268,17 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
       CheckLastBatch(7)
     )
   }
+
+  test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
+    val input = MemoryStream[(Int, Int)]
+    val df = input.toDS.toDF("id", "time")
+      .withColumn("time", $"time".cast("timestamp"))
+      .withWatermark("time", "1 second")
+      .dropDuplicates("id")
+      .select($"id", $"time".cast("long"))
+    testStream(df)(
+      AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
+      CheckLastBatch(1 -> 1, 2 -> 2)
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org