You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/08 04:32:54 UTC
spark git commit: [SPARK-19841][SS] watermarkPredicate should filter
based on keys
Repository: spark
Updated Branches:
refs/heads/master b9783a92f -> ca849ac4e
[SPARK-19841][SS] watermarkPredicate should filter based on keys
## What changes were proposed in this pull request?
`StreamingDeduplicateExec.watermarkPredicate` should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in `keyExpression` has a different position in the row.
`StateStoreSaveExec` has the same codes but its parent can makes sure the watermark column positions in `keyExpression` and `row` are the same.
## How was this patch tested?
The added test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #17183 from zsxwing/SPARK-19841.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca849ac4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca849ac4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca849ac4
Branch: refs/heads/master
Commit: ca849ac4e8fc520a4a12949b62b9730c5dfa097d
Parents: b9783a9
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Mar 7 20:32:51 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Mar 7 20:32:51 2017 -0800
----------------------------------------------------------------------
.../execution/streaming/statefulOperators.scala | 28 ++++++++++++++------
.../spark/sql/streaming/DeduplicateSuite.scala | 19 +++++++++++++
2 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ca849ac4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index d925297..cbf656a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -68,7 +68,7 @@ trait StateStoreWriter extends StatefulOperator {
}
/** An operator that supports watermark. */
-trait WatermarkSupport extends SparkPlan {
+trait WatermarkSupport extends UnaryExecNode {
/** The keys that may have a watermark attribute. */
def keyExpressions: Seq[Attribute]
@@ -76,8 +76,8 @@ trait WatermarkSupport extends SparkPlan {
/** The watermark value. */
def eventTimeWatermark: Option[Long]
- /** Generate a predicate that matches data older than the watermark */
- lazy val watermarkPredicate: Option[Predicate] = {
+ /** Generate an expression that matches data older than the watermark */
+ lazy val watermarkExpression: Option[Expression] = {
val optionalWatermarkAttribute =
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
@@ -96,9 +96,19 @@ trait WatermarkSupport extends SparkPlan {
}
logInfo(s"Filtering state store on: $evictionExpression")
- newPredicate(evictionExpression, keyExpressions)
+ evictionExpression
}
}
+
+ /** Generate a predicate based on keys that matches data older than the watermark */
+ lazy val watermarkPredicateForKeys: Option[Predicate] =
+ watermarkExpression.map(newPredicate(_, keyExpressions))
+
+ /**
+ * Generate a predicate based on the child output that matches data older than the watermark.
+ */
+ lazy val watermarkPredicate: Option[Predicate] =
+ watermarkExpression.map(newPredicate(_, child.output))
}
/**
@@ -192,7 +202,7 @@ case class StateStoreSaveExec(
}
// Assumption: Append mode can be done only when watermark has been specified
- store.remove(watermarkPredicate.get.eval _)
+ store.remove(watermarkPredicateForKeys.get.eval _)
store.commit()
numTotalStateRows += store.numKeys()
@@ -215,7 +225,9 @@ case class StateStoreSaveExec(
override def hasNext: Boolean = {
if (!baseIterator.hasNext) {
// Remove old aggregates if watermark specified
- if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval _)
+ if (watermarkPredicateForKeys.nonEmpty) {
+ store.remove(watermarkPredicateForKeys.get.eval _)
+ }
store.commit()
numTotalStateRows += store.numKeys()
false
@@ -361,7 +373,7 @@ case class StreamingDeduplicateExec(
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val baseIterator = watermarkPredicate match {
- case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
+ case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}
@@ -381,7 +393,7 @@ case class StreamingDeduplicateExec(
}
CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
- watermarkPredicate.foreach(f => store.remove(f.eval _))
+ watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
store.commit()
numTotalStateRows += store.numKeys()
})
http://git-wip-us.apache.org/repos/asf/spark/blob/ca849ac4/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
index 7ea7162..a15c2cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
@@ -249,4 +249,23 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
}
}
}
+
+ test("SPARK-19841: watermarkPredicate should filter based on keys") {
+ val input = MemoryStream[(Int, Int)]
+ val df = input.toDS.toDF("time", "id")
+ .withColumn("time", $"time".cast("timestamp"))
+ .withWatermark("time", "1 second")
+ .dropDuplicates("id", "time") // Change the column positions
+ .select($"id")
+ testStream(df)(
+ AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
+ CheckLastBatch(1, 2),
+ AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
+ CheckLastBatch(3, 4),
+ AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) due to watermark
+ CheckLastBatch(5, 6),
+ AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
+ CheckLastBatch(7)
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org