You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/15 00:46:30 UTC

spark git commit: [SPARK-18124] Observed delay based Event Time Watermarks

Repository: spark
Updated Branches:
  refs/heads/master bd85603ba -> c07187823


[SPARK-18124] Observed delay based Event Time Watermarks

This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_.  An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data.  This PR also has augmented `StreamExecution` to use this watermark for several purposes:
  - To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
  - To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change.  Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).

An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
  .groupBy(window($"eventTime", "1 minute") as 'window)
  .count()
  .writeStream
  .format("console")
  .mode("append") // In append mode, we only output finalized aggregations.
  .start()
```

### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_.  An additional constraint is that the watermark must increase monotonically.

Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time.  In some cases we may still process records that arrive more than delay late.

This mechanism was chosen for the initial implementation over processing time for two reasons:
  - it is robust to downtime that could affect processing delay
  - it does not require syncing of time or timezones between the producer and the processing engine.

### Other notable implementation details
 - A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
 - We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`.  This allows downstream operations to know which column holds the event time.  Operations like `window` propagate this metadata.
 - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
 - Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.

### Remaining in this PR
 - [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log.  We will need to do so to ensure determinism, but this is deferred until #15626 is merged.

### Other follow-ups
There are some natural additional features that we should consider for future work:
 - Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
 - `Update` mode so you can get partial results before a group is evicted.
 - Other mechanisms for calculating the watermark.  In particular a watermark based on quantiles would be more robust to outliers.

Author: Michael Armbrust <mi...@databricks.com>

Closes #15702 from marmbrus/watermarks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0718782
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0718782
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0718782

Branch: refs/heads/master
Commit: c07187823a98f0d1a0f58c06e28a27e1abed157a
Parents: bd85603
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Nov 14 16:46:26 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Nov 14 16:46:26 2016 -0800

----------------------------------------------------------------------
 .../spark/unsafe/types/CalendarInterval.java    |   4 +
 .../apache/spark/sql/AnalysisException.scala    |   3 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   8 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  10 +
 .../analysis/UnsupportedOperationChecker.scala  |  18 +-
 .../sql/catalyst/analysis/unresolved.scala      |   3 +-
 .../catalyst/expressions/namedExpressions.scala |  17 +-
 .../plans/logical/EventTimeWatermark.scala      |  51 +++++
 .../scala/org/apache/spark/sql/Dataset.scala    |  40 +++-
 .../spark/sql/execution/SparkStrategies.scala   |  12 +-
 .../sql/execution/aggregate/AggUtils.scala      |   9 +-
 .../spark/sql/execution/command/commands.scala  |   2 +-
 .../streaming/EventTimeWatermarkExec.scala      |  93 +++++++++
 .../sql/execution/streaming/ForeachSink.scala   |   3 +-
 .../streaming/IncrementalExecution.scala        |  12 +-
 .../execution/streaming/StatefulAggregate.scala | 170 ++++++++++-------
 .../execution/streaming/StreamExecution.scala   |  25 ++-
 .../sql/execution/streaming/StreamMetrics.scala |   1 +
 .../state/HDFSBackedStateStoreProvider.scala    |  23 +--
 .../execution/streaming/state/StateStore.scala  |   7 +-
 .../streaming/state/StateStoreSuite.scala       |   6 +-
 .../spark/sql/streaming/WatermarkSuite.scala    | 191 +++++++++++++++++++
 22 files changed, 597 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index 518ed64..a7b0e6f 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -252,6 +252,10 @@ public final class CalendarInterval implements Serializable {
   public final int months;
   public final long microseconds;
 
+  public final long milliseconds() {
+    return this.microseconds / MICROS_PER_MILLI;
+  }
+
   public CalendarInterval(int months, long microseconds) {
     this.months = months;
     this.microseconds = microseconds;

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index 7defb9d..ff85761 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -31,7 +31,8 @@ class AnalysisException protected[sql] (
     val message: String,
     val line: Option[Int] = None,
     val startPosition: Option[Int] = None,
-    val plan: Option[LogicalPlan] = None,
+    // Some plans fail to serialize due to bugs in scala collections.
+    @transient val plan: Option[LogicalPlan] = None,
     val cause: Option[Throwable] = None)
   extends Exception(message, cause.orNull) with Serializable {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c14f353..ec5f710 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2272,7 +2272,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
           windowExpressions.head.timeColumn.resolved &&
           windowExpressions.head.checkInputDataTypes().isSuccess) {
         val window = windowExpressions.head
-        val windowAttr = AttributeReference("window", window.dataType)()
+
+        val metadata = window.timeColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+        val windowAttr =
+          AttributeReference("window", window.dataType, metadata = metadata)()
 
         val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
         val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7b75c1f..98e50d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -148,6 +148,16 @@ trait CheckAnalysis extends PredicateHelper {
         }
 
         operator match {
+          case etw: EventTimeWatermark =>
+            etw.eventTime.dataType match {
+              case s: StructType
+                if s.find(_.name == "end").map(_.dataType) == Some(TimestampType) =>
+              case _: TimestampType =>
+              case _ =>
+                failAnalysis(
+                  s"Event time must be defined on a window or a timestamp, but " +
+                  s"${etw.eventTime.name} is of type ${etw.eventTime.dataType.simpleString}")
+            }
           case f: Filter if f.condition.dataType != BooleanType =>
             failAnalysis(
               s"filter expression '${f.condition.sql}' " +

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index e81370c..c054fcb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.streaming.OutputMode
@@ -55,9 +56,20 @@ object UnsupportedOperationChecker {
     // Disallow some output mode
     outputMode match {
       case InternalOutputModes.Append if aggregates.nonEmpty =>
-        throwError(
-          s"$outputMode output mode not supported when there are streaming aggregations on " +
-            s"streaming DataFrames/DataSets")(plan)
+        val aggregate = aggregates.head
+
+        // Find any attributes that are associated with an eventTime watermark.
+        val watermarkAttributes = aggregate.groupingExpressions.collect {
+          case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
+        }
+
+        // We can append rows to the sink once the group is under the watermark. Without this
+        // watermark a group is never "finished" so we would never output anything.
+        if (watermarkAttributes.isEmpty) {
+          throwError(
+            s"$outputMode output mode not supported when there are streaming aggregations on " +
+                s"streaming DataFrames/DataSets")(plan)
+        }
 
       case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
         throwError(

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 235ae04..36ed9ba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, Codege
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, Metadata, StructType}
 
 /**
  * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
@@ -98,6 +98,7 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un
   override def withNullability(newNullability: Boolean): UnresolvedAttribute = this
   override def withQualifier(newQualifier: Option[String]): UnresolvedAttribute = this
   override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)
+  override def withMetadata(newMetadata: Metadata): Attribute = this
 
   override def toString: String = s"'$name"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 306a99d..1274757 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -22,6 +22,7 @@ import java.util.{Objects, UUID}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.types._
 
@@ -104,6 +105,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn
   def withNullability(newNullability: Boolean): Attribute
   def withQualifier(newQualifier: Option[String]): Attribute
   def withName(newName: String): Attribute
+  def withMetadata(newMetadata: Metadata): Attribute
 
   override def toAttribute: Attribute = this
   def newInstance(): Attribute
@@ -292,11 +294,22 @@ case class AttributeReference(
     }
   }
 
+  override def withMetadata(newMetadata: Metadata): Attribute = {
+    AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
+  }
+
   override protected final def otherCopyArgs: Seq[AnyRef] = {
     exprId :: qualifier :: isGenerated :: Nil
   }
 
-  override def toString: String = s"$name#${exprId.id}$typeSuffix"
+  /** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
+  private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) {
+    s"-T${metadata.getLong(EventTimeWatermark.delayKey)}ms"
+  } else {
+    ""
+  }
+
+  override def toString: String = s"$name#${exprId.id}$typeSuffix$delaySuffix"
 
   // Since the expression id is not in the first constructor it is missing from the default
   // tree string.
@@ -332,6 +345,8 @@ case class PrettyAttribute(
   override def withQualifier(newQualifier: Option[String]): Attribute =
     throw new UnsupportedOperationException
   override def withName(newName: String): Attribute = throw new UnsupportedOperationException
+  override def withMetadata(newMetadata: Metadata): Attribute =
+    throw new UnsupportedOperationException
   override def qualifier: Option[String] = throw new UnsupportedOperationException
   override def exprId: ExprId = throw new UnsupportedOperationException
   override def nullable: Boolean = true

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
new file mode 100644
index 0000000..4224a79
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+
+object EventTimeWatermark {
+  /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
+  val delayKey = "spark.watermarkDelayMs"
+}
+
+/**
+ * Used to mark a user specified column as holding the event time for a row.
+ */
+case class EventTimeWatermark(
+    eventTime: Attribute,
+    delay: CalendarInterval,
+    child: LogicalPlan) extends LogicalPlan {
+
+  // Update the metadata on the eventTime column to include the desired delay.
+  override val output: Seq[Attribute] = child.output.map { a =>
+    if (a semanticEquals eventTime) {
+      val updatedMetadata = new MetadataBuilder()
+        .withMetadata(a.metadata)
+        .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+        .build()
+      a.withMetadata(updatedMetadata)
+    } else {
+      a
+    }
+  }
+
+  override val children: Seq[LogicalPlan] = child :: Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index eb2b20a..af30683 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.Utils
 
 private[sql] object Dataset {
@@ -476,7 +477,7 @@ class Dataset[T] private[sql](
    * `collect()`, will throw an [[AnalysisException]] when there is a streaming
    * source present.
    *
-   * @group basic
+   * @group streaming
    * @since 2.0.0
    */
   @Experimental
@@ -496,8 +497,6 @@ class Dataset[T] private[sql](
   /**
    * Returns a checkpointed version of this Dataset.
    *
-   * @param eager When true, materializes the underlying checkpointed RDD eagerly.
-   *
    * @group basic
    * @since 2.1.0
    */
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and thus can be emitted when
+   *    using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going aggregations.
+   *
+   *  The current watermark is computed by looking at the `MAX(eventTime)` seen across
+   *  all of the partitions in the query minus a user specified `delayThreshold`.  Due to the cost
+   *  of coordinating this value across partitions, the actual watermark used is only guaranteed
+   *  to be at least `delayThreshold` behind the actual event time.  In some cases we may still
+   *  process records that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime the name of the column that contains the event time of the row.
+   * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
+   *                       record that has been processed in the form of an interval
+   *                       (e.g. "1 minute" or "5 hours").
+   *
+   * @group streaming
+   * @since 2.1.0
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  // We only accept an existing column name, not a derived column here as a watermark that is
+  // defined on a derived column cannot referenced elsewhere in the plan.
+  def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
+    val parsedDelay =
+      Option(CalendarInterval.fromString("interval " + delayThreshold))
+        .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+    EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
+  }
+
+  /**
    * Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
    * and all cells will be aligned right. For example:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 190fdd8..2308ae8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,20 +18,23 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{execution, SaveMode, Strategy}
+import org.apache.spark.sql.{SaveMode, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
-import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamingQuery
 
 /**
  * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
@@ -224,6 +227,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    */
   object StatefulAggregationStrategy extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case EventTimeWatermark(columnName, delay, child) =>
+        EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil
+
       case PhysicalAggregation(
         namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index 3c8ef1a..8b8ccf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -328,8 +328,13 @@ object AggUtils {
     }
     // Note: stateId and returnAllStates are filled in later with preparation rules
     // in IncrementalExecution.
-    val saved = StateStoreSaveExec(
-      groupingAttributes, stateId = None, returnAllStates = None, partialMerged2)
+    val saved =
+      StateStoreSaveExec(
+        groupingAttributes,
+        stateId = None,
+        outputMode = None,
+        eventTimeWatermark = None,
+        partialMerged2)
 
     val finalAndCompleteAggregate: SparkPlan = {
       val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final))

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index d82e54e..52d8dc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -104,7 +104,7 @@ case class ExplainCommand(
       if (logicalPlan.isStreaming) {
         // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
         // output mode does not matter since there is no `Sink`.
-        new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0)
+        new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0)
       } else {
         sparkSession.sessionState.executePlan(logicalPlan)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
new file mode 100644
index 0000000..4c8cb06
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.math.max
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+/** Tracks the maximum positive long seen. */
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long] {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
+
+  override def reset(): Unit = {
+    currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+    currentValue = max(v, value)
+  }
+
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+    currentValue = max(value, other.value)
+  }
+}
+
+/**
+ * Used to mark a column as the containing the event time for a given record. In addition to
+ * adding appropriate metadata to this column, this operator also tracks the maximum observed event
+ * time. Based on the maximum observed time and a user specified delay, we can calculate the
+ * `watermark` after which we assume we will no longer see late records for a particular time
+ * period.
+ */
+case class EventTimeWatermarkExec(
+    eventTime: Attribute,
+    delay: CalendarInterval,
+    child: SparkPlan) extends SparkPlan {
+
+  // TODO: Use Spark SQL Metrics?
+  val maxEventTime = new MaxLong
+  sparkContext.register(maxEventTime)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions { iter =>
+      val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
+      iter.map { row =>
+        maxEventTime.add(getEventTime(row).getLong(0))
+        row
+      }
+    }
+  }
+
+  // Update the metadata on the eventTime column to include the desired delay.
+  override val output: Seq[Attribute] = child.output.map { a =>
+    if (a semanticEquals eventTime) {
+      val updatedMetadata = new MetadataBuilder()
+          .withMetadata(a.metadata)
+          .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+          .build()
+
+      a.withMetadata(updatedMetadata)
+    } else {
+      a
+    }
+  }
+
+  override def children: Seq[SparkPlan] = child :: Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 24f98b9..f5c550d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -60,7 +60,8 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
             deserialized,
             data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
             data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
-            data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+            data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId,
+            data.queryExecution.asInstanceOf[IncrementalExecution].currentEventTimeWatermark)
           incrementalExecution.toRdd.mapPartitions { rows =>
             rows.map(_.get(0, objectType))
           }.asInstanceOf[RDD[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 05294df..e9d072f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -32,11 +32,13 @@ class IncrementalExecution(
     logicalPlan: LogicalPlan,
     val outputMode: OutputMode,
     val checkpointLocation: String,
-    val currentBatchId: Long)
+    val currentBatchId: Long,
+    val currentEventTimeWatermark: Long)
   extends QueryExecution(sparkSession, logicalPlan) {
 
   // TODO: make this always part of planning.
-  val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+  val stateStrategy =
+    sparkSession.sessionState.planner.StatefulAggregationStrategy +:
     sparkSession.sessionState.planner.StreamingRelationStrategy +:
     sparkSession.sessionState.experimentalMethods.extraStrategies
 
@@ -57,17 +59,17 @@ class IncrementalExecution(
   val state = new Rule[SparkPlan] {
 
     override def apply(plan: SparkPlan): SparkPlan = plan transform {
-      case StateStoreSaveExec(keys, None, None,
+      case StateStoreSaveExec(keys, None, None, None,
              UnaryExecNode(agg,
                StateStoreRestoreExec(keys2, None, child))) =>
         val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
-        val returnAllStates = if (outputMode == InternalOutputModes.Complete) true else false
         operatorId += 1
 
         StateStoreSaveExec(
           keys,
           Some(stateId),
-          Some(returnAllStates),
+          Some(outputMode),
+          Some(currentEventTimeWatermark),
           agg.withNewChildren(
             StateStoreRestoreExec(
               keys,

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index ad8238f..7af978a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -21,12 +21,17 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateUnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution
+import org.apache.spark.sql.InternalOutputModes._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
 
 /** Used to identify the state store for a given operator. */
 case class OperatorStateId(
@@ -92,8 +97,9 @@ case class StateStoreRestoreExec(
  */
 case class StateStoreSaveExec(
     keyExpressions: Seq[Attribute],
-    stateId: Option[OperatorStateId],
-    returnAllStates: Option[Boolean],
+    stateId: Option[OperatorStateId] = None,
+    outputMode: Option[OutputMode] = None,
+    eventTimeWatermark: Option[Long] = None,
     child: SparkPlan)
   extends execution.UnaryExecNode with StatefulOperator {
 
@@ -104,9 +110,9 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
-    assert(returnAllStates.nonEmpty,
-      "Incorrect planning in IncrementalExecution, returnAllStates have not been set")
-    val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _
+    assert(outputMode.nonEmpty,
+      "Incorrect planning in IncrementalExecution, outputMode has not been set")
+
     child.execute().mapPartitionsWithStateStore(
       getStateId.checkpointLocation,
       operatorId = getStateId.operatorId,
@@ -114,75 +120,95 @@ case class StateStoreSaveExec(
       keyExpressions.toStructType,
       child.output.toStructType,
       sqlContext.sessionState,
-      Some(sqlContext.streams.stateStoreCoordinator)
-    )(saveAndReturnFunc)
+      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+        val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+        val numOutputRows = longMetric("numOutputRows")
+        val numTotalStateRows = longMetric("numTotalStateRows")
+        val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+        outputMode match {
+          // Update and output all rows in the StateStore.
+          case Some(Complete) =>
+            while (iter.hasNext) {
+              val row = iter.next().asInstanceOf[UnsafeRow]
+              val key = getKey(row)
+              store.put(key.copy(), row.copy())
+              numUpdatedStateRows += 1
+            }
+            store.commit()
+            numTotalStateRows += store.numKeys()
+            store.iterator().map { case (k, v) =>
+              numOutputRows += 1
+              v.asInstanceOf[InternalRow]
+            }
+
+          // Update and output only rows being evicted from the StateStore
+          case Some(Append) =>
+            while (iter.hasNext) {
+              val row = iter.next().asInstanceOf[UnsafeRow]
+              val key = getKey(row)
+              store.put(key.copy(), row.copy())
+              numUpdatedStateRows += 1
+            }
+
+            val watermarkAttribute =
+              keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+            // If we are evicting based on a window, use the end of the window.  Otherwise just
+            // use the attribute itself.
+            val evictionExpression =
+              if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+                LessThanOrEqual(
+                  GetStructField(watermarkAttribute, 1),
+                  Literal(eventTimeWatermark.get * 1000))
+              } else {
+                LessThanOrEqual(
+                  watermarkAttribute,
+                  Literal(eventTimeWatermark.get * 1000))
+              }
+
+            logInfo(s"Filtering state store on: $evictionExpression")
+            val predicate = newPredicate(evictionExpression, keyExpressions)
+            store.remove(predicate.eval)
+
+            store.commit()
+
+            numTotalStateRows += store.numKeys()
+            store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed =>
+              numOutputRows += 1
+              removed.value.asInstanceOf[InternalRow]
+            }
+
+          // Update and output modified rows from the StateStore.
+          case Some(Update) =>
+            new Iterator[InternalRow] {
+              private[this] val baseIterator = iter
+
+              override def hasNext: Boolean = {
+                if (!baseIterator.hasNext) {
+                  store.commit()
+                  numTotalStateRows += store.numKeys()
+                  false
+                } else {
+                  true
+                }
+              }
+
+              override def next(): InternalRow = {
+                val row = baseIterator.next().asInstanceOf[UnsafeRow]
+                val key = getKey(row)
+                store.put(key.copy(), row.copy())
+                numOutputRows += 1
+                numUpdatedStateRows += 1
+                row
+              }
+            }
+
+          case _ => throw new UnsupportedOperationException(s"Invalid output mode: $outputMode")
+        }
+    }
   }
 
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  /**
-   * Save all the rows to the state store, and return all the rows in the state store.
-   * Note that this returns an iterator that pipelines the saving to store with downstream
-   * processing.
-   */
-  private def saveAndReturnUpdated(
-      store: StateStore,
-      iter: Iterator[InternalRow]): Iterator[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val numTotalStateRows = longMetric("numTotalStateRows")
-    val numUpdatedStateRows = longMetric("numUpdatedStateRows")
-
-    new Iterator[InternalRow] {
-      private[this] val baseIterator = iter
-      private[this] val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
-
-      override def hasNext: Boolean = {
-        if (!baseIterator.hasNext) {
-          store.commit()
-          numTotalStateRows += store.numKeys()
-          false
-        } else {
-          true
-        }
-      }
-
-      override def next(): InternalRow = {
-        val row = baseIterator.next().asInstanceOf[UnsafeRow]
-        val key = getKey(row)
-        store.put(key.copy(), row.copy())
-        numOutputRows += 1
-        numUpdatedStateRows += 1
-        row
-      }
-    }
-  }
-
-  /**
-   * Save all the rows to the state store, and return all the rows in the state store.
-   * Note that the saving to store is blocking; only after all the rows have been saved
-   * is the iterator on the update store data is generated.
-   */
-  private def saveAndReturnAll(
-      store: StateStore,
-      iter: Iterator[InternalRow]): Iterator[InternalRow] = {
-    val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
-    val numOutputRows = longMetric("numOutputRows")
-    val numTotalStateRows = longMetric("numTotalStateRows")
-    val numUpdatedStateRows = longMetric("numUpdatedStateRows")
-
-    while (iter.hasNext) {
-      val row = iter.next().asInstanceOf[UnsafeRow]
-      val key = getKey(row)
-      store.put(key.copy(), row.copy())
-      numUpdatedStateRows += 1
-    }
-    store.commit()
-    numTotalStateRows += store.numKeys()
-    store.iterator().map { case (k, v) =>
-      numOutputRows += 1
-      v.asInstanceOf[InternalRow]
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 57e89f8..3ca6fea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -92,6 +92,9 @@ class StreamExecution(
   /** The current batchId or -1 if execution has not yet been initialized. */
   private var currentBatchId: Long = -1
 
+  /** The current eventTime watermark, used to bound the lateness of data that will processed. */
+  private var currentEventTimeWatermark: Long = 0
+
   /** All stream sources present in the query plan. */
   private val sources =
     logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
@@ -427,7 +430,8 @@ class StreamExecution(
         triggerLogicalPlan,
         outputMode,
         checkpointFile("state"),
-        currentBatchId)
+        currentBatchId,
+        currentEventTimeWatermark)
       lastExecution.executedPlan // Force the lazy generation of execution plan
     }
 
@@ -436,6 +440,25 @@ class StreamExecution(
     sink.addBatch(currentBatchId, nextBatch)
     reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+    // Update the eventTime watermark if we find one in the plan.
+    // TODO: Does this need to be an AttributeMap?
+    lastExecution.executedPlan.collect {
+      case e: EventTimeWatermarkExec =>
+        logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+        (e.maxEventTime.value / 1000) - e.delay.milliseconds()
+    }.headOption.foreach { newWatermark =>
+      if (newWatermark > currentEventTimeWatermark) {
+        logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+        currentEventTimeWatermark = newWatermark
+      } else {
+        logTrace(s"Event time didn't move: $newWatermark < $currentEventTimeWatermark")
+      }
+
+      if (newWatermark != 0) {
+        streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark)
+      }
+    }
+
     awaitBatchLock.lock()
     try {
       // Wake up any threads that are waiting for the stream to progress.

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
index e98d188..5645554 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -221,6 +221,7 @@ object StreamMetrics extends Logging {
   val IS_TRIGGER_ACTIVE = "isTriggerActive"
   val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
   val STATUS_MESSAGE = "statusMessage"
+  val EVENT_TIME_WATERMARK = "eventTimeWatermark"
 
   val START_TIMESTAMP = "timestamp.triggerStart"
   val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index f07feaa..493fdaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -109,7 +109,7 @@ private[state] class HDFSBackedStateStoreProvider(
         case Some(ValueAdded(_, _)) =>
           // Value did not exist in previous version and was added already, keep it marked as added
           allUpdates.put(key, ValueAdded(key, value))
-        case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
+        case Some(ValueUpdated(_, _)) | Some(ValueRemoved(_, _)) =>
           // Value existed in previous version and updated/removed, mark it as updated
           allUpdates.put(key, ValueUpdated(key, value))
         case None =>
@@ -124,24 +124,25 @@ private[state] class HDFSBackedStateStoreProvider(
     /** Remove keys that match the following condition */
     override def remove(condition: UnsafeRow => Boolean): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or aborted")
-
-      val keyIter = mapToUpdate.keySet().iterator()
-      while (keyIter.hasNext) {
-        val key = keyIter.next
-        if (condition(key)) {
-          keyIter.remove()
+      val entryIter = mapToUpdate.entrySet().iterator()
+      while (entryIter.hasNext) {
+        val entry = entryIter.next
+        if (condition(entry.getKey)) {
+          val value = entry.getValue
+          val key = entry.getKey
+          entryIter.remove()
 
           Option(allUpdates.get(key)) match {
             case Some(ValueUpdated(_, _)) | None =>
               // Value existed in previous version and maybe was updated, mark removed
-              allUpdates.put(key, KeyRemoved(key))
+              allUpdates.put(key, ValueRemoved(key, value))
             case Some(ValueAdded(_, _)) =>
               // Value did not exist in previous version and was added, should not appear in updates
               allUpdates.remove(key)
-            case Some(KeyRemoved(_)) =>
+            case Some(ValueRemoved(_, _)) =>
               // Remove already in update map, no need to change
           }
-          writeToDeltaFile(tempDeltaFileStream, KeyRemoved(key))
+          writeToDeltaFile(tempDeltaFileStream, ValueRemoved(key, value))
         }
       }
     }
@@ -334,7 +335,7 @@ private[state] class HDFSBackedStateStoreProvider(
         writeUpdate(key, value)
       case ValueUpdated(key, value) =>
         writeUpdate(key, value)
-      case KeyRemoved(key) =>
+      case ValueRemoved(key, value) =>
         writeRemove(key)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 7132e28..9bc6c0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -99,13 +99,16 @@ trait StateStoreProvider {
 
 
 /** Trait representing updates made to a [[StateStore]]. */
-sealed trait StoreUpdate
+sealed trait StoreUpdate {
+  def key: UnsafeRow
+  def value: UnsafeRow
+}
 
 case class ValueAdded(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
 
 case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
 
-case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
+case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 533cd0c..05fc734 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -668,11 +668,11 @@ private[state] object StateStoreSuite {
   }
 
   def updatesToSet(iterator: Iterator[StoreUpdate]): Set[TestUpdate] = {
-    iterator.map { _ match {
+    iterator.map {
       case ValueAdded(key, value) => Added(rowToString(key), rowToInt(value))
       case ValueUpdated(key, value) => Updated(rowToString(key), rowToInt(value))
-      case KeyRemoved(key) => Removed(rowToString(key))
-    }}.toSet
+      case ValueRemoved(key, _) => Removed(rowToString(key))
+    }.toSet
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0718782/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
new file mode 100644
index 0000000..3617ec0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
+
+class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("error on bad column") {
+    val inputData = MemoryStream[Int].toDF()
+    val e = intercept[AnalysisException] {
+      inputData.withWatermark("badColumn", "1 minute")
+    }
+    assert(e.getMessage contains "badColumn")
+  }
+
+  test("error on wrong type") {
+    val inputData = MemoryStream[Int].toDF()
+    val e = intercept[AnalysisException] {
+      inputData.withWatermark("value", "1 minute")
+    }
+    assert(e.getMessage contains "value")
+    assert(e.getMessage contains "int")
+  }
+
+
+  test("watermark metric") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 15),
+      AssertOnLastQueryStatus { status =>
+        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+      },
+      AddData(inputData, 15),
+      AssertOnLastQueryStatus { status =>
+        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+      },
+      AddData(inputData, 25),
+      AssertOnLastQueryStatus { status =>
+        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000"
+      }
+    )
+  }
+
+  test("append-mode watermark aggregation") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      CheckAnswer((10, 5))
+    )
+  }
+
+  ignore("recovery") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      StopStream,
+      StartStream(),
+      CheckAnswer(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      StopStream,
+      StartStream(),
+      CheckAnswer((10, 5))
+    )
+  }
+
+  test("dropping old data") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12),
+      CheckAnswer(),
+      AddData(inputData, 25),     // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25),     // Evict items less than previous watermark.
+      CheckAnswer((10, 3)),
+      AddData(inputData, 10),     // 10 is later than 15 second watermark
+      CheckAnswer((10, 3)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3))        // Should not emit an incorrect partial result.
+    )
+  }
+
+  test("complete mode") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    // No eviction when asked to compute complete results.
+    testStream(windowedAggregation, OutputMode.Complete)(
+      AddData(inputData, 10, 11, 12),
+      CheckAnswer((10, 3)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3), (25, 1)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3), (25, 2)),
+      AddData(inputData, 10),
+      CheckAnswer((10, 4), (25, 2)),
+      AddData(inputData, 25),
+      CheckAnswer((10, 4), (25, 3))
+    )
+  }
+
+  test("group by on raw timestamp") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy($"eventTime")
+        .agg(count("*") as 'count)
+        .select($"eventTime".cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10),
+      CheckAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckAnswer(),
+      AddData(inputData, 25), // Evict items less than previous watermark.
+      CheckAnswer((10, 1))
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org