You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/05/04 23:35:28 UTC

spark git commit: [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.

Repository: spark
Updated Branches:
  refs/heads/master af4dc5028 -> 47b5b6852


[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.

## What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more context.

Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that `populateStartOffsets` internally called `constructNextBatch` which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in `runActivatedStream` called `constructNextBatch` but only if `populateStartOffsets` hadn't already called it). Instead, the refactoring makes it cleaner.
    - `populateStartOffsets` only the updates `availableOffsets` and `committedOffsets`. Does not call `constructNextBatch`.
    - Main loop in `runActivatedStream` calls `constructNextBatch` which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed.
    - If next batch is ready then we call `runBatch` or sleep.
    - That's it.

- Refactoring watermark management logic - This has been refactored out from `MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.

- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in `constructNextBatch`.

- Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes.
    - `CheckNewAnswer` is a new test action that verifies the new rows generated since the last time the answer was checked by `CheckAnswer`, `CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods.
    - `assertNumStateRows` has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check).

## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the following patterns.
    - Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input.
    - Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation and deduplication

Author: Tathagata Das <ta...@gmail.com>

Closes #21220 from tdas/SPARK-24157.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47b5b685
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47b5b685
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47b5b685

Branch: refs/heads/master
Commit: 47b5b68528c154d32b3f40f388918836d29462b8
Parents: af4dc50
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri May 4 16:35:24 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri May 4 16:35:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 +
 .../streaming/IncrementalExecution.scala        |  10 +
 .../streaming/MicroBatchExecution.scala         | 231 +++++++--------
 .../execution/streaming/WatermarkTracker.scala  |  73 +++++
 .../spark/sql/execution/streaming/memory.scala  |  17 +-
 .../execution/streaming/sources/memoryV2.scala  |   8 +-
 .../execution/streaming/statefulOperators.scala |  16 +
 .../streaming/sources/ForeachWriterSuite.scala  |   8 +-
 .../spark/sql/streaming/DeduplicateSuite.scala  | 285 ------------------
 .../sql/streaming/EventTimeWatermarkSuite.scala | 112 +++----
 .../sql/streaming/FileStreamSinkSuite.scala     |   7 +-
 .../sql/streaming/StateStoreMetricsTest.scala   |  52 +++-
 .../apache/spark/sql/streaming/StreamTest.scala |  56 +++-
 .../streaming/StreamingDeduplicationSuite.scala | 295 +++++++++++++++++++
 .../sql/streaming/StreamingJoinSuite.scala      |  18 +-
 .../sql/streaming/StreamingQuerySuite.scala     |   2 +-
 16 files changed, 693 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3942240..895e150 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -919,6 +919,14 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(10000L)
 
+  val STREAMING_NO_DATA_MICRO_BATCHES_ENABLED =
+    buildConf("spark.sql.streaming.noDataMicroBatchesEnabled")
+      .doc(
+        "Whether streaming micro-batch engine will execute batches without data " +
+          "for eager state management for stateful streaming queries.")
+      .booleanConf
+      .createWithDefault(true)
+
   val STREAMING_METRICS_ENABLED =
     buildConf("spark.sql.streaming.metricsEnabled")
       .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
@@ -1313,6 +1321,9 @@ class SQLConf extends Serializable with Logging {
   def streamingNoDataProgressEventInterval: Long =
     getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
 
+  def streamingNoDataMicroBatchesEnabled: Boolean =
+    getConf(STREAMING_NO_DATA_MICRO_BATCHES_ENABLED)
+
   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
 
   def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 1a83c88..c480b96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -143,4 +143,14 @@ class IncrementalExecution(
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
+
+  /**
+   * Should the MicroBatchExecution run another batch based on this execution and the current
+   * updated metadata.
+   */
+  def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+    executedPlan.collect {
+      case p: StateStoreWriter => p.shouldRunAnotherBatch(newMetadata)
+    }.exists(_ == true)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6e23197..6709e70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -61,6 +61,8 @@ class MicroBatchExecution(
     case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
   }
 
+  private val watermarkTracker = new WatermarkTracker()
+
   override lazy val logicalPlan: LogicalPlan = {
     assert(queryExecutionThread eq Thread.currentThread,
       "logicalPlan must be initialized in QueryExecutionThread " +
@@ -128,40 +130,55 @@ class MicroBatchExecution(
    * Repeatedly attempts to run batches as data arrives.
    */
   protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
-    triggerExecutor.execute(() => {
-      startTrigger()
 
+    val noDataBatchesEnabled =
+      sparkSessionForStream.sessionState.conf.streamingNoDataMicroBatchesEnabled
+
+    triggerExecutor.execute(() => {
       if (isActive) {
+        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
+        var currentBatchHasNewData = false // Whether the current batch had new data
+
+        startTrigger()
+
         reportTimeTaken("triggerExecution") {
+          // We'll do this initialization only once every start / restart
           if (currentBatchId < 0) {
-            // We'll do this initialization only once
             populateStartOffsets(sparkSessionForStream)
-            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
-          } else {
-            constructNextBatch()
+            logInfo(s"Stream started from $committedOffsets")
           }
-          if (dataAvailable) {
-            currentStatus = currentStatus.copy(isDataAvailable = true)
-            updateStatusMessage("Processing new data")
+
+          // Set this before calling constructNextBatch() so any Spark jobs executed by sources
+          // while getting new data have the correct description
+          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+          // Try to construct the next batch. This will return true only if the next batch is
+          // ready and runnable. Note that the current batch may be runnable even without
+          // new data to process as `constructNextBatch` may decide to run a batch for
+          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
+          // is available or not.
+          currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
+
+          // Remember whether the current batch has data or not. This will be required later
+          // for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
+          // to false as the batch would have already processed the available data.
+          currentBatchHasNewData = isNewDataAvailable
+
+          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
+          if (currentBatchIsRunnable) {
+            if (currentBatchHasNewData) updateStatusMessage("Processing new data")
+            else updateStatusMessage("No new data but cleaning up state")
             runBatch(sparkSessionForStream)
+          } else {
+            updateStatusMessage("Waiting for data to arrive")
           }
         }
-        // Report trigger as finished and construct progress object.
-        finishTrigger(dataAvailable)
-        if (dataAvailable) {
-          // Update committed offsets.
-          commitLog.add(currentBatchId)
-          committedOffsets ++= availableOffsets
-          logDebug(s"batch ${currentBatchId} committed")
-          // We'll increase currentBatchId after we complete processing current batch's data
-          currentBatchId += 1
-          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-        } else {
-          currentStatus = currentStatus.copy(isDataAvailable = false)
-          updateStatusMessage("Waiting for data to arrive")
-          Thread.sleep(pollingDelayMs)
-        }
+
+        finishTrigger(currentBatchHasNewData)  // Must be outside reportTimeTaken so it is recorded
+
+        // If the current batch has been executed, then increment the batch id, else there was
+        // no data to execute the batch
+        if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs)
       }
       updateStatusMessage("Waiting for next trigger")
       isActive
@@ -211,6 +228,7 @@ class MicroBatchExecution(
           OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
           offsetSeqMetadata = OffsetSeqMetadata(
             metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
+          watermarkTracker.setWatermark(metadata.batchWatermarkMs)
         }
 
         /* identify the current batch id: if commit log indicates we successfully processed the
@@ -235,7 +253,6 @@ class MicroBatchExecution(
               currentBatchId = latestCommittedBatchId + 1
               committedOffsets ++= availableOffsets
               // Construct a new batch be recomputing availableOffsets
-              constructNextBatch()
             } else if (latestCommittedBatchId < latestBatchId - 1) {
               logWarning(s"Batch completion log latest batch id is " +
                 s"${latestCommittedBatchId}, which is not trailing " +
@@ -243,19 +260,18 @@ class MicroBatchExecution(
             }
           case None => logInfo("no commit log present")
         }
-        logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+        logInfo(s"Resuming at batch $currentBatchId with committed offsets " +
           s"$committedOffsets and available offsets $availableOffsets")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
-        constructNextBatch()
     }
   }
 
   /**
    * Returns true if there is any new data available to be processed.
    */
-  private def dataAvailable: Boolean = {
+  private def isNewDataAvailable: Boolean = {
     availableOffsets.exists {
       case (source, available) =>
         committedOffsets
@@ -266,93 +282,63 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When there is new data the
-   * batchId counter is incremented and a new log entry is written with the newest offsets.
+   * Attempts to construct a batch according to:
+   *  - Availability of new data
+   *  - Need for timeouts and state cleanups in stateful operators
+   *
+   * Returns true only if the next batch should be executed.
+   *
+   * Here is the high-level logic on how this constructs the next batch.
+   * - Check each source whether new data is available
+   * - Updated the query's metadata and check using the last execution whether there is any need
+   *   to run another batch (for state clean up, etc.)
+   * - If either of the above is true, then construct the next batch by committing to the offset
+   *   log that range of offsets that the next batch will process.
    */
-  private def constructNextBatch(): Unit = {
-    // Check to see what new data is available.
-    val hasNewData = {
-      awaitProgressLock.lock()
-      try {
-        // Generate a map from each unique source to the next available offset.
-        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
-          case s: Source =>
-            updateStatusMessage(s"Getting offsets from $s")
-            reportTimeTaken("getOffset") {
-              (s, s.getOffset)
-            }
-          case s: MicroBatchReader =>
-            updateStatusMessage(s"Getting offsets from $s")
-            reportTimeTaken("setOffsetRange") {
-              // Once v1 streaming source execution is gone, we can refactor this away.
-              // For now, we set the range here to get the source to infer the available end offset,
-              // get that offset, and then set the range again when we later execute.
-              s.setOffsetRange(
-                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
-                Optional.empty())
-            }
-
-            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
-            (s, Option(currentOffset))
-        }.toMap
-        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
-
-        if (dataAvailable) {
-          true
-        } else {
-          noNewData = true
-          false
+  private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked {
+    // If new data is already available that means this method has already been called before
+    // and it must have already committed the offset range of next batch to the offset log.
+    // Hence do nothing, just return true.
+    if (isNewDataAvailable) return true
+
+    // Generate a map from each unique source to the next available offset.
+    val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
+      case s: Source =>
+        updateStatusMessage(s"Getting offsets from $s")
+        reportTimeTaken("getOffset") {
+          (s, s.getOffset)
         }
-      } finally {
-        awaitProgressLock.unlock()
-      }
-    }
-    if (hasNewData) {
-      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-      // Update the eventTime watermarks if we find any in the plan.
-      if (lastExecution != null) {
-        lastExecution.executedPlan.collect {
-          case e: EventTimeWatermarkExec => e
-        }.zipWithIndex.foreach {
-          case (e, index) if e.eventTimeStats.value.count > 0 =>
-            logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
-            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
-            val prevWatermarkMs = watermarkMsMap.get(index)
-            if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
-              watermarkMsMap.put(index, newWatermarkMs)
-            }
-
-          // Populate 0 if we haven't seen any data yet for this watermark node.
-          case (_, index) =>
-            if (!watermarkMsMap.isDefinedAt(index)) {
-              watermarkMsMap.put(index, 0)
-            }
+      case s: MicroBatchReader =>
+        updateStatusMessage(s"Getting offsets from $s")
+        reportTimeTaken("setOffsetRange") {
+          // Once v1 streaming source execution is gone, we can refactor this away.
+          // For now, we set the range here to get the source to infer the available end offset,
+          // get that offset, and then set the range again when we later execute.
+          s.setOffsetRange(
+            toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
+            Optional.empty())
         }
 
-        // Update the global watermark to the minimum of all watermark nodes.
-        // This is the safest option, because only the global watermark is fault-tolerant. Making
-        // it the minimum of all individual watermarks guarantees it will never advance past where
-        // any individual watermark operator would be if it were in a plan by itself.
-        if(!watermarkMsMap.isEmpty) {
-          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
-          if (newWatermarkMs > batchWatermarkMs) {
-            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-            batchWatermarkMs = newWatermarkMs
-          } else {
-            logDebug(
-              s"Event time didn't move: $newWatermarkMs < " +
-                s"$batchWatermarkMs")
-          }
-        }
-      }
-      offsetSeqMetadata = offsetSeqMetadata.copy(
-        batchWatermarkMs = batchWatermarkMs,
-        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
+        val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
+        (s, Option(currentOffset))
+    }.toMap
+    availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
+
+    // Update the query metadata
+    offsetSeqMetadata = offsetSeqMetadata.copy(
+      batchWatermarkMs = watermarkTracker.currentWatermark,
+      batchTimestampMs = triggerClock.getTimeMillis())
+
+    // Check whether next batch should be constructed
+    val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
+      Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
+    val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch
 
+    if (shouldConstructNextBatch) {
+      // Commit the next batch offset range to the offset log
       updateStatusMessage("Writing offsets to log")
       reportTimeTaken("walCommit") {
-        assert(offsetLog.add(
-          currentBatchId,
+        assert(offsetLog.add(currentBatchId,
           availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
           s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId. " +
@@ -373,7 +359,7 @@ class MicroBatchExecution(
                 reader.commit(reader.deserializeOffset(off.json))
             }
           } else {
-            throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
+            throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist")
           }
         }
 
@@ -384,15 +370,12 @@ class MicroBatchExecution(
           commitLog.purge(currentBatchId - minLogEntriesToMaintain)
         }
       }
+      noNewData = false
     } else {
-      awaitProgressLock.lock()
-      try {
-        // Wake up any threads that are waiting for the stream to progress.
-        awaitProgressLockCondition.signalAll()
-      } finally {
-        awaitProgressLock.unlock()
-      }
+      noNewData = true
+      awaitProgressLockCondition.signalAll()
     }
+    shouldConstructNextBatch
   }
 
   /**
@@ -400,6 +383,8 @@ class MicroBatchExecution(
    * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
    */
   private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+    logDebug(s"Running batch $currentBatchId")
+
     // Request unprocessed data from all sources.
     newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
@@ -513,17 +498,17 @@ class MicroBatchExecution(
       }
     }
 
-    awaitProgressLock.lock()
-    try {
-      // Wake up any threads that are waiting for the stream to progress.
+    withProgressLocked {
+      commitLog.add(currentBatchId)
+      committedOffsets ++= availableOffsets
       awaitProgressLockCondition.signalAll()
-    } finally {
-      awaitProgressLock.unlock()
     }
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    logDebug(s"Completed batch ${currentBatchId}")
   }
 
   /** Execute a function while locking the stream from making an progress */
-  private[sql] def withProgressLocked(f: => Unit): Unit = {
+  private[sql] def withProgressLocked[T](f: => T): T = {
     awaitProgressLock.lock()
     try {
       f

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
new file mode 100644
index 0000000..8086566
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.SparkPlan
+
+class WatermarkTracker extends Logging {
+  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
+  private var watermarkMs: Long = 0
+  private var updated = false
+
+  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
+    watermarkMs = newWatermarkMs
+  }
+
+  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
+    val watermarkOperators = executedPlan.collect {
+      case e: EventTimeWatermarkExec => e
+    }
+    if (watermarkOperators.isEmpty) return
+
+
+    watermarkOperators.zipWithIndex.foreach {
+      case (e, index) if e.eventTimeStats.value.count > 0 =>
+        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
+        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+        val prevWatermarkMs = operatorToWatermarkMap.get(index)
+        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
+          operatorToWatermarkMap.put(index, newWatermarkMs)
+        }
+
+      // Populate 0 if we haven't seen any data yet for this watermark node.
+      case (_, index) =>
+        if (!operatorToWatermarkMap.isDefinedAt(index)) {
+          operatorToWatermarkMap.put(index, 0)
+        }
+    }
+
+    // Update the global watermark to the minimum of all watermark nodes.
+    // This is the safest option, because only the global watermark is fault-tolerant. Making
+    // it the minimum of all individual watermarks guarantees it will never advance past where
+    // any individual watermark operator would be if it were in a plan by itself.
+    val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
+    if (newWatermarkMs > watermarkMs) {
+      logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+      watermarkMs = newWatermarkMs
+      updated = true
+    } else {
+      logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
+      updated = false
+    }
+  }
+
+  def currentWatermark: Long = synchronized { watermarkMs }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 628923d..2225827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -222,11 +222,20 @@ class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
   }
 }
 
+/** A common trait for MemorySinks with methods used for testing */
+trait MemorySinkBase extends BaseStreamingSink {
+  def allData: Seq[Row]
+  def latestBatchData: Seq[Row]
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row]
+  def latestBatchId: Option[Long]
+}
+
 /**
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink with Logging {
+class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
+  with MemorySinkBase with Logging {
 
   private case class AddedData(batchId: Long, data: Array[Row])
 
@@ -236,7 +245,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
 
   /** Returns all rows that are stored in this [[Sink]]. */
   def allData: Seq[Row] = synchronized {
-    batches.map(_.data).flatten
+    batches.flatMap(_.data)
   }
 
   def latestBatchId: Option[Long] = synchronized {
@@ -245,6 +254,10 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
 
   def latestBatchData: Seq[Row] = synchronized { batches.lastOption.toSeq.flatten(_.data) }
 
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row] = synchronized {
+    batches.filter(_.batchId > sinceBatchId).flatMap(_.data)
+  }
+
   def toDebugString: String = synchronized {
     batches.map { case AddedData(batchId, data) =>
       val dataStr = try data.mkString(" ") catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index d871d37..0d6c239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
-import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
+class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging {
   override def createStreamWriter(
       queryId: String,
       schema: StructType,
@@ -67,6 +67,10 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
     batches.lastOption.toSeq.flatten(_.data)
   }
 
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row] = synchronized {
+    batches.filter(_.batchId > sinceBatchId).flatMap(_.data)
+  }
+
   def toDebugString: String = synchronized {
     batches.map { case AddedData(batchId, data) =>
       val dataStr = try data.mkString(" ") catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index c9354ac..1691a63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -126,6 +126,12 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
         name -> SQLMetrics.createTimingMetric(sparkContext, desc)
     }.toMap
   }
+
+  /**
+   * Should the MicroBatchExecution run another batch based on this stateful operator and the
+   * current updated metadata.
+   */
+  def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = false
 }
 
 /** An operator that supports watermark. */
@@ -388,6 +394,12 @@ case class StateStoreSaveExec(
       ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
     }
   }
+
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+    (outputMode.contains(Append) || outputMode.contains(Update)) &&
+      eventTimeWatermark.isDefined &&
+      newMetadata.batchWatermarkMs > eventTimeWatermark.get
+  }
 }
 
 /** Physical operator for executing streaming Deduplicate. */
@@ -454,6 +466,10 @@ case class StreamingDeduplicateExec(
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+    eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get
+  }
 }
 
 object StreamingDeduplicateExec {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
index 03bf71b..e60c339 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
@@ -211,14 +211,12 @@ class ForeachWriterSuite extends StreamTest with SharedSQLContext with BeforeAnd
     try {
       inputData.addData(10, 11, 12)
       query.processAllAvailable()
-      inputData.addData(25) // Advance watermark to 15 seconds
-      query.processAllAvailable()
       inputData.addData(25) // Evict items less than previous watermark
       query.processAllAvailable()
 
       // There should be 3 batches and only does the last batch contain a value.
       val allEvents = ForeachWriterSuite.allEvents()
-      assert(allEvents.size === 3)
+      assert(allEvents.size === 4)
       val expectedEvents = Seq(
         Seq(
           ForeachWriterSuite.Open(partition = 0, version = 0),
@@ -230,6 +228,10 @@ class ForeachWriterSuite extends StreamTest with SharedSQLContext with BeforeAnd
         ),
         Seq(
           ForeachWriterSuite.Open(partition = 0, version = 2),
+          ForeachWriterSuite.Close(None)
+        ),
+        Seq(
+          ForeachWriterSuite.Open(partition = 0, version = 3),
           ForeachWriterSuite.Process(value = 3),
           ForeachWriterSuite.Close(None)
         )

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
deleted file mode 100644
index 0088b64..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashPartitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingDeduplicateExec}
-import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.functions._
-
-class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
-
-  import testImplicits._
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    StateStore.stop()
-  }
-
-  test("deduplicate with all columns") {
-    val inputData = MemoryStream[String]
-    val result = inputData.toDS().dropDuplicates()
-
-    testStream(result, Append)(
-      AddData(inputData, "a"),
-      CheckLastBatch("a"),
-      assertNumStateRows(total = 1, updated = 1),
-      AddData(inputData, "a"),
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-      AddData(inputData, "b"),
-      CheckLastBatch("b"),
-      assertNumStateRows(total = 2, updated = 1)
-    )
-  }
-
-  test("deduplicate with some columns") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS().dropDuplicates("_1")
-
-    testStream(result, Append)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1),
-      assertNumStateRows(total = 1, updated = 1),
-      AddData(inputData, "a" -> 2), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1),
-      assertNumStateRows(total = 2, updated = 1)
-    )
-  }
-
-  test("multiple deduplicates") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
-
-    testStream(result, Append)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
-
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with watermark") {
-    val inputData = MemoryStream[Int]
-    val result = inputData.toDS()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .dropDuplicates()
-      .select($"eventTime".cast("long").as[Long])
-
-    testStream(result, Append)(
-      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
-      CheckLastBatch(10 to 15: _*),
-      assertNumStateRows(total = 6, updated = 6),
-
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckLastBatch(25),
-      assertNumStateRows(total = 7, updated = 1),
-
-      AddData(inputData, 25), // Drop states less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-
-      AddData(inputData, 10), // Should not emit anything as data less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-
-      AddData(inputData, 45), // Advance watermark to 35 seconds
-      CheckLastBatch(45),
-      assertNumStateRows(total = 2, updated = 1),
-
-      AddData(inputData, 45), // Drop states less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0)
-    )
-  }
-
-  test("deduplicate with aggregate - append mode") {
-    val inputData = MemoryStream[Int]
-    val windowedaggregate = inputData.toDS()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .dropDuplicates()
-      .withWatermark("eventTime", "10 seconds")
-      .groupBy(window($"eventTime", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    testStream(windowedaggregate)(
-      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
-      CheckLastBatch(),
-      // states in aggregate in [10, 14), [15, 20) (2 windows)
-      // states in deduplicate is 10 to 15
-      assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
-
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckLastBatch(),
-      // states in aggregate in [10, 14), [15, 20) and [25, 30) (3 windows)
-      // states in deduplicate is 10 to 15 and 25
-      assertNumStateRows(total = Seq(3L, 7L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, 25), // Emit items less than watermark and drop their state
-      CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate
-      // states in aggregate in [15, 20) and [25, 30) (2 windows, note aggregate uses the end of
-      // window to evict items, so [15, 20) is still in the state store)
-      // states in deduplicate is 25
-      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
-
-      AddData(inputData, 10), // Should not emit anything as data less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
-
-      AddData(inputData, 40), // Advance watermark to 30 seconds
-      CheckLastBatch(),
-      // states in aggregate in [15, 20), [25, 30) and [40, 45)
-      // states in deduplicate is 25 and 40,
-      assertNumStateRows(total = Seq(3L, 2L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, 40), // Emit items less than watermark and drop their state
-      CheckLastBatch((15 -> 1), (25 -> 1)),
-      // states in aggregate in [40, 45)
-      // states in deduplicate is 40,
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L))
-    )
-  }
-
-  test("deduplicate with aggregate - update mode") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS()
-      .select($"_1" as "str", $"_2" as "num")
-      .dropDuplicates()
-      .groupBy("str")
-      .agg(sum("num"))
-      .as[(String, Long)]
-
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with aggregate - complete mode") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS()
-      .select($"_1" as "str", $"_2" as "num")
-      .dropDuplicates()
-      .groupBy("str")
-      .agg(sum("num"))
-      .as[(String, Long)]
-
-    testStream(result, Complete)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("a" -> 3L, "b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with file sink") {
-    withTempDir { output =>
-      withTempDir { checkpointDir =>
-        val outputPath = output.getAbsolutePath
-        val inputData = MemoryStream[String]
-        val result = inputData.toDS().dropDuplicates()
-        val q = result.writeStream
-          .format("parquet")
-          .outputMode(Append)
-          .option("checkpointLocation", checkpointDir.getPath)
-          .start(outputPath)
-        try {
-          inputData.addData("a")
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-          inputData.addData("a") // Dropped
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-          inputData.addData("b")
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
-        } finally {
-          q.stop()
-        }
-      }
-    }
-  }
-
-  test("SPARK-19841: watermarkPredicate should filter based on keys") {
-    val input = MemoryStream[(Int, Int)]
-    val df = input.toDS.toDF("time", "id")
-      .withColumn("time", $"time".cast("timestamp"))
-      .withWatermark("time", "1 second")
-      .dropDuplicates("id", "time") // Change the column positions
-      .select($"id")
-    testStream(df)(
-      AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
-      CheckLastBatch(1, 2),
-      AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
-      CheckLastBatch(3, 4),
-      AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) due to watermark
-      CheckLastBatch(5, 6),
-      AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
-      CheckLastBatch(7)
-    )
-  }
-
-  test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
-    val input = MemoryStream[(Int, Int)]
-    val df = input.toDS.toDF("id", "time")
-      .withColumn("time", $"time".cast("timestamp"))
-      .withWatermark("time", "1 second")
-      .dropDuplicates("id")
-      .select($"id", $"time".cast("long"))
-    testStream(df)(
-      AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
-      CheckLastBatch(1 -> 1, 2 -> 2)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index d6bef9c..7e8fde1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode._
 
 class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matchers with Logging {
@@ -137,20 +138,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
         assert(e.get("watermark") === formatTimestamp(5))
       },
       AddData(inputData2, 25),
-      CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(25))
-        assert(e.get("min") === formatTimestamp(25))
-        assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(5))
-      },
-      AddData(inputData2, 25),
       CheckAnswer((10, 3)),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(25))
         assert(e.get("min") === formatTimestamp(25))
         assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(15))
+        assert(e.get("watermark") === formatTimestamp(5))
       }
     )
   }
@@ -167,15 +160,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
 
     testStream(windowedAggregation)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(inputData, 25),   // Advance watermark to 15 seconds
-      CheckLastBatch(),
-      assertNumStateRows(3),
-      AddData(inputData, 25),   // Emit items less than watermark and drop their state
-      CheckLastBatch((10, 5)),
+      CheckNewAnswer((10, 5)),
       assertNumStateRows(2),
       AddData(inputData, 10),   // Should not emit anything as data less than watermark
-      CheckLastBatch(),
+      CheckNewAnswer(),
       assertNumStateRows(2)
     )
   }
@@ -193,15 +183,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
 
     testStream(windowedAggregation, OutputMode.Update)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch((10, 5), (15, 1)),
+      CheckNewAnswer((10, 5), (15, 1)),
       AddData(inputData, 25),     // Advance watermark to 15 seconds
-      CheckLastBatch((25, 1)),
-      assertNumStateRows(3),
+      CheckNewAnswer((25, 1)),
+      assertNumStateRows(2),
       AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
-      CheckLastBatch((25, 2)),
+      CheckNewAnswer((25, 2)),
       assertNumStateRows(2),
       AddData(inputData, 10),     // Should not emit anything as data less than watermark
-      CheckLastBatch(),
+      CheckNewAnswer(),
       assertNumStateRows(2)
     )
   }
@@ -251,56 +241,25 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
 
     testStream(df)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch(),
+      CheckAnswer(),
       AddData(inputData, 25), // Advance watermark to 15 seconds
-      StopStream,
-      StartStream(),
-      CheckLastBatch(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckLastBatch((10, 5)),
+      CheckAnswer((10, 5)),
       StopStream,
       AssertOnQuery { q => // purge commit and clear the sink
-        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
         q.commitLog.purge(commit)
         q.sink.asInstanceOf[MemorySink].clear()
         true
       },
       StartStream(),
-      CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
+      AddData(inputData, 10, 27, 30), // Advance watermark to 20 seconds, 10 should be ignored
+      CheckAnswer((15, 1)),
       StopStream,
-      StartStream(), // Watermark should still be 15 seconds
-      AddData(inputData, 17),
-      CheckLastBatch(), // We still do not see next batch
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
-      AddData(inputData, 30), // Evict items less than previous watermark.
-      CheckLastBatch((15, 2)) // Ensure we see next window
-    )
-  }
-
-  test("dropping old data") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy(window($"eventTime", "5 seconds") as 'window)
-        .agg(count("*") as 'count)
-        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 10, 11, 12),
-      CheckAnswer(),
-      AddData(inputData, 25),     // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25),     // Evict items less than previous watermark.
-      CheckAnswer((10, 3)),
-      AddData(inputData, 10),     // 10 is later than 15 second watermark
-      CheckAnswer((10, 3)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 3))        // Should not emit an incorrect partial result.
+      StartStream(),
+      AddData(inputData, 17), // Watermark should still be 20 seconds, 17 should be ignored
+      CheckAnswer((15, 1)),
+      AddData(inputData, 40), // Advance watermark to 30 seconds, emit first data 25
+      CheckNewAnswer((25, 2))
     )
   }
 
@@ -421,8 +380,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
       AddData(inputData, 10),
       CheckAnswer(),
       AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
       CheckAnswer((10, 1))
     )
   }
@@ -501,8 +458,35 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     }
   }
 
+  test("test no-data flag") {
+    val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key
+
+    def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") {
+      val inputData = MemoryStream[Int]
+      val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+      testStream(windowedAggregation)(
+        StartStream(additionalConfs = Map(flagKey -> flag.toString)),
+        AddData(inputData, 10, 11, 12, 13, 14, 15),
+        CheckNewAnswer(),
+        AddData(inputData, 25), // Advance watermark to 15 seconds
+        // Check if there is new answer if flag is set, no new answer otherwise
+        if (flag) CheckNewAnswer((10, 5)) else CheckNewAnswer()
+      )
+    }
+
+    testWithFlag(true)
+    testWithFlag(false)
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
-    val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
+    q.processAllAvailable()
+    val progressWithData = q.recentProgress.lastOption.get
     assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
     true
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index cf41d7e..ed53def 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
       check() // nothing emitted yet
 
       addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
-      check() // nothing emitted yet
+      check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
 
       addTimestamp(140) // wm = 113 before this, emit results on 100-105, wm = 130 after this
-      check((100L, 105L) -> 2L)
-
-      addTimestamp(150) // wm = 130s before this, emit results on 120-125, wm = 150 after this
-      check((100L, 105L) -> 2L, (120L, 125L) -> 1L)
+      check((100L, 105L) -> 2L, (120L, 125L) -> 1L)  // no-data-batch emits results on 120-125
 
     } finally {
       if (query != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
index 368c460..e45f9d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
@@ -17,20 +17,58 @@
 
 package org.apache.spark.sql.streaming
 
+import org.apache.spark.sql.execution.streaming.StreamExecution
+
 trait StateStoreMetricsTest extends StreamTest {
 
+  private var lastCheckedRecentProgressIndex = -1
+  private var lastQuery: StreamExecution = null
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    lastCheckedRecentProgressIndex = -1
+  }
+
   def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
     AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q =>
-      val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
-      assert(
-        progressWithData.stateOperators.map(_.numRowsTotal) === total,
-        "incorrect total rows")
-      assert(
-        progressWithData.stateOperators.map(_.numRowsUpdated) === updated,
-        "incorrect updates rows")
+      val recentProgress = q.recentProgress
+      require(recentProgress.nonEmpty, "No progress made, cannot check num state rows")
+      require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention,
+        "This test assumes that all progresses are present in q.recentProgress but " +
+          "some may have been dropped due to retention limits")
+
+      if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
+      lastQuery = q
+
+      val numStateOperators = recentProgress.last.stateOperators.length
+      val progressesSinceLastCheck = recentProgress
+        .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
+        .filter(_.stateOperators.length == numStateOperators)
+
+      val allNumUpdatedRowsSinceLastCheck =
+        progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
+
+      lazy val debugString = "recent progresses:\n" +
+        progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
+
+      val numTotalRows = recentProgress.last.stateOperators.map(_.numRowsTotal)
+      assert(numTotalRows === total, s"incorrect total rows, $debugString")
+
+      val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
+      assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")
+
+      lastCheckedRecentProgressIndex = recentProgress.length - 1
       true
     }
 
   def assertNumStateRows(total: Long, updated: Long): AssertOnQuery =
     assertNumStateRows(Seq(total), Seq(updated))
+
+  def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {
+    if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L)
+
+    assert(arraySeq.forall(_.length == arrayLength),
+      "Arrays are of different lengths:\n" + arraySeq.map(_.toSeq).mkString("\n"))
+    (0 until arrayLength).map { index => arraySeq.map(_.apply(index)).sum }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af0268f..9d139a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -192,7 +193,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
   case class CheckAnswerRowsContains(expectedAnswer: Seq[Row], lastOnly: Boolean = false)
     extends StreamAction with StreamMustBeRunning {
     override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
-    private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
+    private def operatorName = if (lastOnly) "CheckLastBatchContains" else "CheckAnswerContains"
   }
 
   case class CheckAnswerRowsByFunc(
@@ -202,6 +203,23 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
     private def operatorName = if (lastOnly) "CheckLastBatchByFunc" else "CheckAnswerByFunc"
   }
 
+  case class CheckNewAnswerRows(expectedAnswer: Seq[Row])
+    extends StreamAction with StreamMustBeRunning {
+    override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
+
+    private def operatorName = "CheckNewAnswer"
+  }
+
+  object CheckNewAnswer {
+    def apply(): CheckNewAnswerRows = CheckNewAnswerRows(Seq.empty)
+
+    def apply[A: Encoder](data: A, moreData: A*): CheckNewAnswerRows = {
+      val encoder = encoderFor[A]
+      val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
+      CheckNewAnswerRows((data +: moreData).map(d => toExternalRow.fromRow(encoder.toRow(d))))
+    }
+  }
+
   /** Stops the stream. It must currently be running. */
   case object StopStream extends StreamAction with StreamMustBeRunning
 
@@ -435,13 +453,24 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
          """.stripMargin)
     }
 
-    def fetchStreamAnswer(currentStream: StreamExecution, lastOnly: Boolean) = {
+    var lastFetchedMemorySinkLastBatchId: Long = -1
+
+    def fetchStreamAnswer(
+        currentStream: StreamExecution,
+        lastOnly: Boolean = false,
+        sinceLastFetchOnly: Boolean = false) = {
+      verify(
+        !(lastOnly && sinceLastFetchOnly), "both lastOnly and sinceLastFetchOnly cannot be true")
       verify(currentStream != null, "stream not running")
 
       // Block until all data added has been processed for all the source
       awaiting.foreach { case (sourceIndex, offset) =>
         failAfter(streamingTimeout) {
           currentStream.awaitOffset(sourceIndex, offset)
+          // Make sure all processing including no-data-batches have been executed
+          if (!currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+            currentStream.processAllAvailable()
+          }
         }
       }
 
@@ -463,14 +492,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
         }
       }
 
-      val (latestBatchData, allData) = sink match {
-        case s: MemorySink => (s.latestBatchData, s.allData)
-        case s: MemorySinkV2 => (s.latestBatchData, s.allData)
-      }
-      try if (lastOnly) latestBatchData else allData catch {
+      val rows = try {
+        if (sinceLastFetchOnly) {
+          if (sink.latestBatchId.getOrElse(-1L) < lastFetchedMemorySinkLastBatchId) {
+            failTest("MemorySink was probably cleared since last fetch. Use CheckAnswer instead.")
+          }
+          sink.dataSinceBatch(lastFetchedMemorySinkLastBatchId)
+        } else {
+          if (lastOnly) sink.latestBatchData else sink.allData
+        }
+      } catch {
         case e: Exception =>
           failTest("Exception while getting data from sink", e)
       }
+      lastFetchedMemorySinkLastBatchId = sink.latestBatchId.getOrElse(-1L)
+      rows
     }
 
     def executeAction(action: StreamAction): Unit = {
@@ -704,6 +740,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
           } catch {
             case e: Throwable => failTest(e.toString)
           }
+
+        case CheckNewAnswerRows(expectedAnswer) =>
+          val sparkAnswer = fetchStreamAnswer(currentStream, sinceLastFetchOnly = true)
+          QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+            error => failTest(error)
+          }
       }
       pos += 1
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
new file mode 100644
index 0000000..42ffd47
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashPartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingDeduplicateExec}
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    StateStore.stop()
+  }
+
+  test("deduplicate with all columns") {
+    val inputData = MemoryStream[String]
+    val result = inputData.toDS().dropDuplicates()
+
+    testStream(result, Append)(
+      AddData(inputData, "a"),
+      CheckLastBatch("a"),
+      assertNumStateRows(total = 1, updated = 1),
+      AddData(inputData, "a"),
+      CheckLastBatch(),
+      assertNumStateRows(total = 1, updated = 0),
+      AddData(inputData, "b"),
+      CheckLastBatch("b"),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("deduplicate with some columns") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS().dropDuplicates("_1")
+
+    testStream(result, Append)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertNumStateRows(total = 1, updated = 1),
+      AddData(inputData, "a" -> 2), // Dropped
+      CheckLastBatch(),
+      assertNumStateRows(total = 1, updated = 0),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("multiple deduplicates") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+    testStream(result, Append)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+      AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with watermark") {
+    val inputData = MemoryStream[Int]
+    val result = inputData.toDS()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .select($"eventTime".cast("long").as[Long])
+
+    testStream(result, Append)(
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckAnswer(10 to 15: _*),
+      assertNumStateRows(total = 6, updated = 6),
+
+      AddData(inputData, 25), // Advance watermark to 15 secs, no-data-batch drops rows <= 15
+      CheckNewAnswer(25),
+      assertNumStateRows(total = 1, updated = 1),
+
+      AddData(inputData, 10), // Should not emit anything as data less than watermark
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0),
+
+      AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25
+      CheckNewAnswer(45),
+      assertNumStateRows(total = 1, updated = 1)
+    )
+  }
+
+  test("deduplicate with aggregate - append mode") {
+    val inputData = MemoryStream[Int]
+    val windowedaggregate = inputData.toDS()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedaggregate)(
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckLastBatch(),
+      // states in aggregate in [10, 14), [15, 20) (2 windows)
+      // states in deduplicate is 10 to 15
+      assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
+
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch
+      // states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14)
+      // states in deduplicate is 25, no-data-batch removed 10 to 14
+      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)),
+
+      AddData(inputData, 10), // Should not emit anything as data less than watermark
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
+
+      AddData(inputData, 40), // Advance watermark to 30 seconds
+      CheckLastBatch((15 -> 1), (25 -> 1)),
+      // states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30)
+      // states in deduplicate is 40; no-data-batch removed 25
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with aggregate - update mode") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS()
+      .select($"_1" as "str", $"_2" as "num")
+      .dropDuplicates()
+      .groupBy("str")
+      .agg(sum("num"))
+      .as[(String, Long)]
+
+    testStream(result, Update)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+      AddData(inputData, "a" -> 1), // Dropped
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
+      AddData(inputData, "a" -> 2),
+      CheckLastBatch("a" -> 3L),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1L),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with aggregate - complete mode") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS()
+      .select($"_1" as "str", $"_2" as "num")
+      .dropDuplicates()
+      .groupBy("str")
+      .agg(sum("num"))
+      .as[(String, Long)]
+
+    testStream(result, Complete)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+      AddData(inputData, "a" -> 1), // Dropped
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
+      AddData(inputData, "a" -> 2),
+      CheckLastBatch("a" -> 3L),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("a" -> 3L, "b" -> 1L),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with file sink") {
+    withTempDir { output =>
+      withTempDir { checkpointDir =>
+        val outputPath = output.getAbsolutePath
+        val inputData = MemoryStream[String]
+        val result = inputData.toDS().dropDuplicates()
+        val q = result.writeStream
+          .format("parquet")
+          .outputMode(Append)
+          .option("checkpointLocation", checkpointDir.getPath)
+          .start(outputPath)
+        try {
+          inputData.addData("a")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("a") // Dropped
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("b")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+
+  test("SPARK-19841: watermarkPredicate should filter based on keys") {
+    val input = MemoryStream[(Int, Int)]
+    val df = input.toDS.toDF("time", "id")
+      .withColumn("time", $"time".cast("timestamp"))
+      .withWatermark("time", "1 second")
+      .dropDuplicates("id", "time") // Change the column positions
+      .select($"id")
+    testStream(df)(
+      AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
+      CheckAnswer(1, 2),
+      AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
+      CheckNewAnswer(3, 4),
+      AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) due to watermark
+      CheckNewAnswer(5, 6),
+      AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
+      CheckNewAnswer(7)
+    )
+  }
+
+  test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
+    val input = MemoryStream[(Int, Int)]
+    val df = input.toDS.toDF("id", "time")
+      .withColumn("time", $"time".cast("timestamp"))
+      .withWatermark("time", "1 second")
+      .dropDuplicates("id")
+      .select($"id", $"time".cast("long"))
+    testStream(df)(
+      AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
+      CheckAnswer(1 -> 1, 2 -> 2)
+    )
+  }
+
+  test("test no-data flag") {
+    val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key
+
+    def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") {
+      val inputData = MemoryStream[Int]
+      val result = inputData.toDS()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .dropDuplicates()
+        .select($"eventTime".cast("long").as[Long])
+
+      testStream(result, Append)(
+        StartStream(additionalConfs = Map(flagKey -> flag.toString)),
+        AddData(inputData, 10, 11, 12, 13, 14, 15),
+        CheckAnswer(10, 11, 12, 13, 14, 15),
+        assertNumStateRows(total = 6, updated = 6),
+
+        AddData(inputData, 25), // Advance watermark to 15 seconds
+        CheckNewAnswer(25),
+        { // State should have been cleaned if flag is set, otherwise should not have been cleaned
+          if (flag) assertNumStateRows(total = 1, updated = 1)
+          else assertNumStateRows(total = 7, updated = 1)
+        }
+      )
+    }
+
+    testWithFlag(true)
+    testWithFlag(false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 11bdd13..da8f960 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -192,7 +192,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       CheckLastBatch((1, 5, 11)),
       AddData(rightInput, (1, 10)),
       CheckLastBatch(), // no match as neither 5, nor 10 from leftTime is less than rightTime 10 - 5
-      assertNumStateRows(total = 3, updated = 1),
+      assertNumStateRows(total = 3, updated = 3),
 
       // Increase event time watermark to 20s by adding data with time = 30s on both inputs
       AddData(leftInput, (1, 3), (1, 30)),
@@ -276,14 +276,14 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       CheckAnswer(),
       AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 31)),
       CheckLastBatch((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
-      assertNumStateRows(total = 7, updated = 6),
+      assertNumStateRows(total = 7, updated = 7),
 
       // If rightTime = 60, then it matches only leftTime = [50, 65]
       AddData(rightInput, (1, 60)),
       CheckLastBatch(),                // matches with nothing on the left
       AddData(leftInput, (1, 49), (1, 50), (1, 65), (1, 66)),
       CheckLastBatch((1, 50, 60), (1, 65, 60)),
-      assertNumStateRows(total = 12, updated = 4),
+      assertNumStateRows(total = 12, updated = 5),
 
       // Event time watermark = min(left: 66 - delay 20 = 46, right: 60 - delay 30 = 30) = 30
       // Left state value watermark = 30 - 10 = slightly less than 20 (since condition has <=)
@@ -573,7 +573,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       // nulls won't show up until the next batch after the watermark advances.
       MultiAddData(leftInput, 21)(rightInput, 22),
       CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 2),
+      assertNumStateRows(total = 12, updated = 12),
       AddData(leftInput, 22),
       CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)),
       assertNumStateRows(total = 3, updated = 1)
@@ -591,7 +591,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       // nulls won't show up until the next batch after the watermark advances.
       MultiAddData(leftInput, 21)(rightInput, 22),
       CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 2),
+      assertNumStateRows(total = 12, updated = 12),
       AddData(leftInput, 22),
       CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)),
       assertNumStateRows(total = 3, updated = 1)
@@ -630,7 +630,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
         CheckLastBatch((1, 1, 5, 10)),
         AddData(rightInput, (1, 11)),
         CheckLastBatch(), // no match as left time is too low
-        assertNumStateRows(total = 5, updated = 1),
+        assertNumStateRows(total = 5, updated = 5),
 
         // Increase event time watermark to 20s by adding data with time = 30s on both inputs
         AddData(leftInput, (1, 7), (1, 30)),
@@ -668,7 +668,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       CheckLastBatch(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
       MultiAddData(leftInput, 20)(rightInput, 21),
       CheckLastBatch(),
-      assertNumStateRows(total = 5, updated = 2),
+      assertNumStateRows(total = 5, updated = 5),  // 1...3 added, but 20 and 21 not added
       AddData(rightInput, 20),
       CheckLastBatch(
         Row(20, 30, 40, 60)),
@@ -678,7 +678,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       CheckLastBatch((40, 50, 80, 120), (41, 50, 82, 123)),
       MultiAddData(leftInput, 70)(rightInput, 71),
       CheckLastBatch(),
-      assertNumStateRows(total = 6, updated = 2),
+      assertNumStateRows(total = 6, updated = 6),  // all inputs added since last check
       AddData(rightInput, 70),
       CheckLastBatch((70, 80, 140, 210)),
       assertNumStateRows(total = 3, updated = 1),
@@ -687,7 +687,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       CheckLastBatch(),
       MultiAddData(leftInput, 1000)(rightInput, 1001),
       CheckLastBatch(),
-      assertNumStateRows(total = 8, updated = 2),
+      assertNumStateRows(total = 8, updated = 5),  // 101...103 added, but 1000 and 1001 not added
       AddData(rightInput, 1000),
       CheckLastBatch(
         Row(1000, 1010, 2000, 3000),

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 390d67d..0cb2375 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -334,7 +334,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
         assert(progress.sources.length === 1)
         assert(progress.sources(0).description contains "MemoryStream")
-        assert(progress.sources(0).startOffset === null)
+        assert(progress.sources(0).startOffset === "0")
         assert(progress.sources(0).endOffset !== null)
         assert(progress.sources(0).processedRowsPerSecond === 4.0)  // 2 rows processed in 500 ms
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org