You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/04 13:02:49 UTC

[flink] branch master updated: [FLINK-12665][table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fbf323  [FLINK-12665][table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators
4fbf323 is described below

commit 4fbf32356a1df34b9ed5ce0c484dfc9695cd7c4f
Author: godfrey he <go...@163.com>
AuthorDate: Thu Jul 4 21:02:37 2019 +0800

    [FLINK-12665][table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators
    
    This closes #8562
---
 .../util/AbstractStreamOperatorTestHarness.java    |   6 +
 .../stream/StreamExecWatermarkAssigner.scala       |  93 +++-
 .../flink/table/plan/optimize/RelNodeBlock.scala   |   9 +
 .../StreamCommonSubGraphBasedOptimizer.scala       |  84 ++-
 ...> FlinkMiniBatchIntervalTraitInitProgram.scala} |  31 +-
 .../plan/optimize/program/FlinkStreamProgram.scala |   8 +
 .../optimize/program/StreamOptimizeContext.scala   |   7 +
 .../table/plan/rules/FlinkStreamRuleSets.scala     |   9 +
 .../stream/MiniBatchIntervalInferRule.scala        | 129 +++++
 .../stream/StreamExecWatermarkAssignerRule.scala   |  64 +++
 .../table/plan/trait/MiniBatchIntervalTrait.scala  |  11 +-
 .../flink/table/plan/util/FlinkRelOptUtil.scala    |  70 +++
 .../apache/flink/table/api/stream/ExplainTest.xml  | 256 ++++++++-
 .../stream/RetractionRulesWithTwoStageAggTest.xml  |  12 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 572 +++++++++++++++++++++
 .../plan/stream/sql/ModifiedMonotonicityTest.xml   |   9 +-
 .../table/plan/stream/sql/agg/AggregateTest.xml    |  18 +-
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 204 +++++---
 .../stream/sql/agg/IncrementalAggregateTest.xml    |  51 +-
 .../plan/stream/sql/agg/TwoStageAggregateTest.xml  |  21 +-
 .../flink/table/api/stream/ExplainTest.scala       |  43 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    | 331 ++++++++++++
 .../table/plan/stream/sql/agg/AggregateTest.scala  |   2 -
 .../table/plan/util/FlinkRelOptUtilTest.scala      |  64 +++
 .../table/runtime/stream/sql/AggregateITCase.scala |  35 ++
 .../table/runtime/utils/StreamingTestBase.scala    |  24 +
 .../apache/flink/table/util/TableTestBase.scala    |  32 +-
 .../apache/flink/table/api/TableConfigOptions.java |   8 +
 .../MiniBatchAssignerOperator.java                 | 105 ++++
 .../MiniBatchedWatermarkAssignerOperator.java      | 176 +++++++
 .../WatermarkAssignerOperator.java                 | 177 +++++++
 .../MiniBatchAssignerOperatorTest.java             | 125 +++++
 .../MiniBatchedWatermarkAssignerOperatorTest.java  | 127 +++++
 .../WatermarkAssignerOperatorTest.java             | 165 ++++++
 .../WatermarkAssignerOperatorTestBase.java         |  64 +++
 35 files changed, 2956 insertions(+), 186 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ce561d4..fc24956 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -566,6 +567,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		}
 	}
 
+	@VisibleForTesting
+	public StreamStatus getStreamStatus() {
+		return mockTask.getStreamStatusMaintainer().getStreamStatus();
+	}
+
 	private class MockOutput implements Output<StreamRecord<OUT>> {
 
 		private TypeSerializer<OUT> outputSerializer;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
index b7b47d8..4847d03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
@@ -18,21 +18,27 @@
 
 package org.apache.flink.table.plan.nodes.physical.stream
 
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, TableException}
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.`trait`.{MiniBatchIntervalTraitDef, MiniBatchMode}
 import org.apache.flink.table.plan.nodes.calcite.WatermarkAssigner
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.optimize.program.FlinkOptimizeContext
+import org.apache.flink.table.runtime.watermarkassigner.{MiniBatchAssignerOperator, MiniBatchedWatermarkAssignerOperator, WatermarkAssignerOperator}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
 import org.apache.flink.util.Preconditions
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import java.util
 
-import org.apache.flink.api.dag.Transformation
+import java.util
+import java.util.Calendar
 
 import scala.collection.JavaConversions._
 
+
 /**
   * Stream physical RelNode for [[WatermarkAssigner]].
   */
@@ -68,22 +74,23 @@ class StreamExecWatermarkAssigner(
   override def explainTerms(pw: RelWriter): RelWriter = {
     val miniBatchInterval = traits.getTrait(MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
 
-    val value = miniBatchInterval.mode match {
-      case MiniBatchMode.None =>
-        // 1. operator requiring watermark, but minibatch is not enabled
-        // 2. redundant watermark definition in DDL
-        // 3. existing window, and window minibatch is disabled.
-        "None"
-      case MiniBatchMode.ProcTime =>
-        val config = cluster.getPlanner.getContext.asInstanceOf[FlinkOptimizeContext].getTableConfig
-        val miniBatchLatency = config.getConf.getLong(
-          TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
-        Preconditions.checkArgument(miniBatchLatency > 0,
-          "MiniBatch latency must be greater that 0.", null)
-        s"Proctime, ${miniBatchLatency}ms"
-      case MiniBatchMode.RowTime =>
-        s"Rowtime, ${miniBatchInterval.interval}ms"
-      case o => throw new TableException(s"Unsupported mode: $o")
+    val value = if (miniBatchInterval.mode == MiniBatchMode.None ||
+      miniBatchInterval.interval == 0) {
+      // 1. redundant watermark definition in DDL
+      // 2. existing window aggregate
+      // 3. operator requiring watermark, but minibatch is not enabled
+      "None"
+    } else if (miniBatchInterval.mode == MiniBatchMode.ProcTime) {
+      val tableConfig = cluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
+      val miniBatchLatency = tableConfig.getConf.getLong(
+        TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+      Preconditions.checkArgument(miniBatchLatency > 0,
+        "MiniBatch latency must be greater that 0.", null)
+      s"Proctime, ${miniBatchLatency}ms"
+    } else if (miniBatchInterval.mode == MiniBatchMode.RowTime) {
+      s"Rowtime, ${miniBatchInterval.interval}ms"
+    } else {
+      throw new TableException(s"Unsupported mode: $miniBatchInterval")
     }
     super.explainTerms(pw).item("miniBatchInterval", value)
   }
@@ -102,7 +109,53 @@ class StreamExecWatermarkAssigner(
 
   override protected def translateToPlanInternal(
       tableEnv: StreamTableEnvironment): Transformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val inputTransformation = getInputNodes.get(0).translateToPlan(tableEnv)
+      .asInstanceOf[Transformation[BaseRow]]
+
+    val inferredInterval = getTraitSet.getTrait(
+      MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
+    val idleTimeout = tableEnv.getConfig.getConf.getLong(
+      TableConfigOptions.SQL_EXEC_SOURCE_IDLE_TIMEOUT)
+
+    val (operator, opName) = if (inferredInterval.mode == MiniBatchMode.None ||
+      inferredInterval.interval == 0) {
+      require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
+      require(watermarkDelay.isDefined, "watermarkDelay should not be None")
+      // 1. redundant watermark definition in DDL
+      // 2. existing window aggregate
+      // 3. operator requiring watermark, but minibatch is not enabled
+      val op = new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
+      val opName =
+        s"WatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}, offset: ${watermarkDelay.get})"
+      (op, opName)
+    } else if (inferredInterval.mode == MiniBatchMode.ProcTime) {
+      val op = new MiniBatchAssignerOperator(inferredInterval.interval)
+      val opName = s"MiniBatchAssigner(intervalMs: ${inferredInterval.interval})"
+      (op, opName)
+    } else {
+      require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
+      require(watermarkDelay.isDefined, "watermarkDelay should not be None")
+      // get the timezone offset.
+      val tzOffset = tableEnv.getConfig.getTimeZone.getOffset(Calendar.ZONE_OFFSET)
+      val op = new MiniBatchedWatermarkAssignerOperator(
+        rowtimeFieldIndex.get,
+        watermarkDelay.get,
+        tzOffset,
+        idleTimeout,
+        inferredInterval.interval)
+      val opName = s"MiniBatchedWatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}," +
+        s" offset: ${watermarkDelay.get}, intervalMs: ${inferredInterval.interval})"
+      (op, opName)
+    }
+
+    val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
+    val transformation = new OneInputTransformation[BaseRow, BaseRow](
+      inputTransformation,
+      opName,
+      operator,
+      outputRowTypeInfo,
+      inputTransformation.getParallelism)
+    transformation
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
index 29b069d..6bacb15 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/RelNodeBlock.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.optimize
 
 import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
+import org.apache.flink.table.plan.`trait`.MiniBatchInterval
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle}
 import org.apache.flink.table.plan.rules.logical.WindowPropertiesRules
@@ -123,6 +124,8 @@ class RelNodeBlock(val outputNode: RelNode) {
 
   private var updateAsRetract: Boolean = false
 
+  private var miniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE
+
   def addChild(block: RelNodeBlock): Unit = childBlocks += block
 
   def children: Seq[RelNodeBlock] = childBlocks.toSeq
@@ -148,6 +151,12 @@ class RelNodeBlock(val outputNode: RelNode) {
 
   def isUpdateAsRetraction: Boolean = updateAsRetract
 
+  def setMiniBatchInterval(miniBatchInterval: MiniBatchInterval): Unit = {
+    this.miniBatchInterval = miniBatchInterval
+  }
+
+  def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
+
   def getChildBlock(node: RelNode): Option[RelNodeBlock] = {
     val find = children.filter(_.outputNode.equals(node))
     if (find.isEmpty) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index d97ec3e..c19bd88 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -18,15 +18,16 @@
 
 package org.apache.flink.table.plan.optimize
 
-import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableConfigOptions}
 import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef, UpdateAsRetractionTraitDef}
+import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef, MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, UpdateAsRetractionTraitDef}
 import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecIntermediateTableScan, StreamPhysicalRel}
 import org.apache.flink.table.plan.optimize.program.{FlinkStreamProgram, StreamOptimizeContext}
 import org.apache.flink.table.plan.schema.IntermediateRelTable
 import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
 import org.apache.flink.table.sinks.{DataStreamTableSink, RetractStreamTableSink}
 import org.apache.flink.util.Preconditions
 
@@ -60,6 +61,17 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
           o.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).sendsUpdatesAsRetractions
       }
       sinkBlock.setUpdateAsRetraction(retractionFromRoot)
+      val miniBatchInterval: MiniBatchInterval = if (tEnv.getConfig.getConf.contains(
+        TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)) {
+        val miniBatchLatency = tEnv.getConfig.getConf.getLong(
+          TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+        Preconditions.checkArgument(miniBatchLatency > 0,
+          "MiniBatch Latency must be greater than 0.", null)
+        MiniBatchInterval(miniBatchLatency, MiniBatchMode.ProcTime)
+      }  else {
+        MiniBatchIntervalTrait.NONE.getMiniBatchInterval
+      }
+      sinkBlock.setMiniBatchInterval(miniBatchInterval)
     }
 
     if (sinkBlocks.size == 1) {
@@ -70,16 +82,17 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
       val optimizedTree = optimizeTree(
         block.getPlan,
         block.isUpdateAsRetraction,
+        block.getMiniBatchInterval,
         isSinkBlock = true)
       block.setOptimizedPlan(optimizedTree)
       return sinkBlocks
     }
 
-    // infer updateAsRetraction property for all input blocks
-    sinkBlocks.foreach(b => inferUpdateAsRetraction(
-      b, b.isUpdateAsRetraction, isSinkBlock = true))
-    // propagate updateAsRetraction to all input blocks
-    sinkBlocks.foreach(propagateTraits)
+    // infer updateAsRetraction property and miniBatchInterval property for all input blocks
+    sinkBlocks.foreach(b => inferTraits(
+      b, b.isUpdateAsRetraction, b.getMiniBatchInterval, isSinkBlock = true))
+    // propagate updateAsRetraction property and miniBatchInterval property to all input blocks
+    sinkBlocks.foreach(propagateTraits(_, isSinkBlock = true))
     // clear the intermediate result
     sinkBlocks.foreach(resetIntermediateResult)
     // optimize recursively RelNodeBlock
@@ -102,6 +115,7 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
         val optimizedTree = optimizeTree(
           s,
           updatesAsRetraction = block.isUpdateAsRetraction,
+          miniBatchInterval = block.getMiniBatchInterval,
           isSinkBlock = true)
         block.setOptimizedPlan(optimizedTree)
 
@@ -109,6 +123,7 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
         val optimizedPlan = optimizeTree(
           o,
           updatesAsRetraction = block.isUpdateAsRetraction,
+          miniBatchInterval = block.getMiniBatchInterval,
           isSinkBlock = isSinkBlock)
         val isAccRetract = optimizedPlan.getTraitSet
           .getTrait(AccModeTraitDef.INSTANCE).getAccMode == AccMode.AccRetract
@@ -126,12 +141,14 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
     *
     * @param relNode The root node of the relational expression tree.
     * @param updatesAsRetraction True if request updates as retraction messages.
+    * @param miniBatchInterval mini-batch interval of the block.
     * @param isSinkBlock True if the given block is sink block.
     * @return The optimized [[RelNode]] tree
     */
   private def optimizeTree(
       relNode: RelNode,
       updatesAsRetraction: Boolean,
+      miniBatchInterval: MiniBatchInterval,
       isSinkBlock: Boolean): RelNode = {
 
     val config = tEnv.getConfig
@@ -147,29 +164,37 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
 
       override def getRexBuilder: RexBuilder = tEnv.getRelBuilder.getRexBuilder
 
-      override def needFinalTimeIndicatorConversion: Boolean = true
-
       override def updateAsRetraction: Boolean = updatesAsRetraction
+
+      def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
+
+      override def needFinalTimeIndicatorConversion: Boolean = true
     })
   }
 
   /**
-    * Infer UpdateAsRetraction property for each block.
+    * Infer UpdateAsRetraction property and MiniBatchInterval property for each block.
     * NOTES: this method should not change the original RelNode tree.
     *
     * @param block              The [[RelNodeBlock]] instance.
     * @param retractionFromRoot Whether the sink need update as retraction messages.
+    * @param miniBatchInterval  mini-batch interval of the block.
     * @param isSinkBlock        True if the given block is sink block.
     */
-  private def inferUpdateAsRetraction(
+  private def inferTraits(
       block: RelNodeBlock,
       retractionFromRoot: Boolean,
+      miniBatchInterval: MiniBatchInterval,
       isSinkBlock: Boolean): Unit = {
 
     block.children.foreach {
       child =>
         if (child.getNewOutputNode.isEmpty) {
-          inferUpdateAsRetraction(child, retractionFromRoot = false, isSinkBlock = false)
+          inferTraits(
+            child,
+            retractionFromRoot = false,
+            miniBatchInterval = MiniBatchInterval.NONE,
+            isSinkBlock = false)
         }
     }
 
@@ -178,12 +203,12 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
       case n: Sink =>
         require(isSinkBlock)
         val optimizedPlan = optimizeTree(
-          n, retractionFromRoot, isSinkBlock = true)
+          n, retractionFromRoot, miniBatchInterval, isSinkBlock = true)
         block.setOptimizedPlan(optimizedPlan)
 
       case o =>
         val optimizedPlan = optimizeTree(
-          o, retractionFromRoot, isSinkBlock = isSinkBlock)
+          o, retractionFromRoot, miniBatchInterval, isSinkBlock = isSinkBlock)
         val name = createUniqueIntermediateRelTableName
         val intermediateRelTable = createIntermediateRelTable(optimizedPlan, isAccRetract = false)
         val newTableScan = wrapIntermediateRelTableToTableScan(intermediateRelTable, name)
@@ -194,38 +219,55 @@ class StreamCommonSubGraphBasedOptimizer(tEnv: StreamTableEnvironment)
   }
 
   /**
-    * Propagate updateAsRetraction to all input blocks.
+    * Propagate updateAsRetraction property and miniBatchInterval property to all input blocks.
     *
     * @param block The [[RelNodeBlock]] instance.
+    * @param isSinkBlock True if the given block is sink block.
     */
-  private def propagateTraits(block: RelNodeBlock): Unit = {
+  private def propagateTraits(block: RelNodeBlock, isSinkBlock: Boolean): Unit = {
 
     // process current block
-    def shipTraits(rel: RelNode, updateAsRetraction: Boolean): Unit = {
+    def shipTraits(
+        rel: RelNode,
+        updateAsRetraction: Boolean,
+        miniBatchInterval: MiniBatchInterval): Unit = {
       rel match {
         case _: StreamExecDataStreamScan | _: StreamExecIntermediateTableScan =>
           val scan = rel.asInstanceOf[TableScan]
           val retractionTrait = scan.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
+          val miniBatchIntervalTrait = scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
           val tableName = scan.getTable.getQualifiedName.mkString(".")
           val inputBlocks = block.children.filter(b => tableName.equals(b.getOutputTableName))
           Preconditions.checkArgument(inputBlocks.size <= 1)
           if (inputBlocks.size == 1) {
+            val mergedInterval = if (isSinkBlock) {
+              // traits of sinkBlock have already been
+              // initialized before first round of optimization.
+              miniBatchIntervalTrait.getMiniBatchInterval
+            } else {
+              FlinkRelOptUtil.mergeMiniBatchInterval(
+                miniBatchIntervalTrait.getMiniBatchInterval, miniBatchInterval)
+            }
+            val newInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+              inputBlocks.head.getMiniBatchInterval,mergedInterval)
+            inputBlocks.head.setMiniBatchInterval(newInterval)
+
             if (retractionTrait.sendsUpdatesAsRetractions || updateAsRetraction) {
               inputBlocks.head.setUpdateAsRetraction(true)
             }
           }
         case ser: StreamPhysicalRel => ser.getInputs.foreach { e =>
           if (ser.needsUpdatesAsRetraction(e) || (updateAsRetraction && !ser.consumesRetractions)) {
-            shipTraits(e, updateAsRetraction = true)
+            shipTraits(e, updateAsRetraction = true, miniBatchInterval)
           } else {
-            shipTraits(e, updateAsRetraction = false)
+            shipTraits(e, updateAsRetraction = false, miniBatchInterval)
           }
         }
       }
     }
 
-    shipTraits(block.getOptimizedPlan, block.isUpdateAsRetraction)
-    block.children.foreach(propagateTraits)
+    shipTraits(block.getOptimizedPlan, block.isUpdateAsRetraction, block.getMiniBatchInterval)
+    block.children.foreach(propagateTraits(_, isSinkBlock = false))
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
similarity index 52%
copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
index 2e2a722..d7eb500 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkMiniBatchIntervalTraitInitProgram.scala
@@ -18,29 +18,18 @@
 
 package org.apache.flink.table.plan.optimize.program
 
-import org.apache.calcite.rex.RexBuilder
+import org.apache.flink.table.plan.`trait`.MiniBatchIntervalTrait
+
+import org.apache.calcite.rel.RelNode
 
 /**
-  * A OptimizeContext allows to obtain stream table environment information when optimizing.
+  * A FlinkOptimizeProgram that does some initialization be for MiniBatch Interval inference.
   */
-trait StreamOptimizeContext extends FlinkOptimizeContext {
-
-  /**
-    * Gets the Calcite [[RexBuilder]] defined in [[org.apache.flink.table.api.TableEnvironment]].
-    */
-  def getRexBuilder: RexBuilder
-
-  /**
-    * Returns true if the sink requests updates as retraction messages
-    * defined in
-    * [[org.apache.flink.table.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimize]].
-    */
-  def updateAsRetraction: Boolean
-
-  /**
-    * Returns true if the output node needs final TimeIndicator conversion
-    * defined in [[org.apache.flink.table.api.TableEnvironment.optimize]].
-    */
-  def needFinalTimeIndicatorConversion: Boolean
+class FlinkMiniBatchIntervalTraitInitProgram extends FlinkOptimizeProgram[StreamOptimizeContext] {
 
+  override def optimize(input: RelNode, context: StreamOptimizeContext): RelNode = {
+    val updatedTraitSet = input.getTraitSet.plus(
+      new MiniBatchIntervalTrait(context.getMiniBatchInterval))
+    input.copy(updatedTraitSet, input.getInputs)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index b64ff31..4589aaa 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -188,6 +188,14 @@ object FlinkStreamProgram {
             .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
             .add(FlinkStreamRuleSets.RETRACTION_RULES)
             .build(), "retraction rules")
+        .addProgram(new FlinkMiniBatchIntervalTraitInitProgram,
+          "Initialization for mini-batch interval inference")
+        .addProgram(
+          FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+            .setHepMatchOrder(HepMatchOrder.TOP_DOWN)
+            .add(FlinkStreamRuleSets.MINI_BATCH_RULES)
+            .build(), "mini-batch interval rules")
         .addProgram(
           FlinkHepRuleSetProgramBuilder.newBuilder
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
index 2e2a722..d1bf96e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.plan.optimize.program
 
+import org.apache.flink.table.plan.`trait`.MiniBatchInterval
+
 import org.apache.calcite.rex.RexBuilder
 
 /**
@@ -38,6 +40,11 @@ trait StreamOptimizeContext extends FlinkOptimizeContext {
   def updateAsRetraction: Boolean
 
   /**
+    * Returns the mini-batch interval that sink requests.
+    */
+  def getMiniBatchInterval: MiniBatchInterval
+
+  /**
     * Returns true if the output node needs final TimeIndicator conversion
     * defined in [[org.apache.flink.table.api.TableEnvironment.optimize]].
     */
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index ca94b8e..f5cc836 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -341,6 +341,7 @@ object FlinkStreamRuleSets {
     StreamExecDataStreamScanRule.INSTANCE,
     StreamExecTableSourceScanRule.INSTANCE,
     StreamExecIntermediateTableScanRule.INSTANCE,
+    StreamExecWatermarkAssignerRule.INSTANCE,
     StreamExecValuesRule.INSTANCE,
     // calc
     StreamExecCalcRule.INSTANCE,
@@ -385,6 +386,14 @@ object FlinkStreamRuleSets {
   )
 
   /**
+    * RuleSet related to watermark assignment.
+    */
+  val MINI_BATCH_RULES: RuleSet = RuleSets.ofList(
+    // watermark interval infer rule
+    MiniBatchIntervalInferRule.INSTANCE
+  )
+
+  /**
     * RuleSet to optimize plans after stream exec execution.
     */
   val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
new file mode 100644
index 0000000..8078de2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
+import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that infers the mini-batch interval of watermark assigner.
+  *
+  * This rule could handle the following two kinds of operator:
+  * 1. supports operators which supports mini-batch and does not require watermark, e.g.
+  * group aggregate. In this case, [[StreamExecWatermarkAssigner]] with Protime mode will be
+  * created if not exist, and the interval value will be set as
+  * [[TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY]].
+  * 2. supports operators which requires watermark, e.g. window join, window aggregate.
+  * In this case, [[StreamExecWatermarkAssigner]] already exists, and its MiniBatchIntervalTrait
+  * will be updated as the merged intervals from its outputs.
+  * Currently, mini-batched window aggregate is not supported, and will be introduced later.
+  *
+  * NOTES: This rule only supports HepPlanner with TOP_DOWN match order.
+  */
+class MiniBatchIntervalInferRule extends RelOptRule(
+  operand(classOf[StreamPhysicalRel], any()),
+  "MiniBatchIntervalInferRule") {
+
+  /**
+    * Get all children RelNodes of a RelNode.
+    *
+    * @param parent The parent RelNode
+    * @return All child nodes
+    */
+  def getInputs(parent: RelNode): Seq[RelNode] = {
+    parent.getInputs.map(_.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val rel: StreamPhysicalRel = call.rel(0)
+    val miniBatchIntervalTrait = rel.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+    val inputs = getInputs(rel)
+    val config = FlinkRelOptUtil.getTableConfigFromContext(rel)
+    val miniBatchEnabled = config.getConf.contains(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
+
+    val updatedTrait = rel match {
+      case _: StreamExecGroupWindowAggregate =>
+        // TODO introduce mini-batch window aggregate later
+        MiniBatchIntervalTrait.NO_MINIBATCH
+
+      case _: StreamExecWatermarkAssigner => MiniBatchIntervalTrait.NONE
+
+      case _ => if (rel.requireWatermark && miniBatchEnabled) {
+        val mergedInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+          miniBatchIntervalTrait.getMiniBatchInterval, MiniBatchInterval(0, MiniBatchMode.RowTime))
+        new MiniBatchIntervalTrait(mergedInterval)
+      } else {
+        miniBatchIntervalTrait
+      }
+    }
+
+    // propagate parent's MiniBatchInterval to children.
+    val updatedInputs = inputs.map { input =>
+      val originTrait = input.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+      val newChild = if (originTrait != updatedTrait) {
+        /**
+          * calc new MiniBatchIntervalTrait according parent's miniBatchInterval
+          * and the child's original miniBatchInterval.
+          */
+        val mergedMiniBatchInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
+          originTrait.getMiniBatchInterval, updatedTrait.getMiniBatchInterval)
+        val inferredTrait = new MiniBatchIntervalTrait(mergedMiniBatchInterval)
+        input.copy(input.getTraitSet.plus(inferredTrait), input.getInputs)
+      } else {
+        input
+      }
+
+      // add mini-batch watermark assigner node.
+      if (isTableSourceScan(newChild) &&
+        newChild.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+          .getMiniBatchInterval.mode == MiniBatchMode.ProcTime) {
+        StreamExecWatermarkAssigner.createIngestionTimeWatermarkAssigner(
+          newChild.getCluster,
+          newChild.getTraitSet,
+          newChild.copy(newChild.getTraitSet.plus(MiniBatchIntervalTrait.NONE), newChild.getInputs))
+      } else {
+        newChild
+      }
+    }
+    // update parent if a child was updated
+    if (inputs != updatedInputs) {
+      val newRel = rel.copy(rel.getTraitSet, updatedInputs)
+      call.transformTo(newRel)
+    }
+  }
+
+  private def isTableSourceScan(node: RelNode): Boolean = node match {
+    case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan => true
+    case _ => false
+  }
+}
+
+object MiniBatchIntervalInferRule {
+  val INSTANCE: RelOptRule = new MiniBatchIntervalInferRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala
new file mode 100644
index 0000000..a7fcafe
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecWatermarkAssignerRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWatermarkAssigner
+import org.apache.flink.table.plan.nodes.physical.stream.StreamExecWatermarkAssigner
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+/**
+  * Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamExecWatermarkAssigner]].
+  */
+class StreamExecWatermarkAssignerRule
+  extends ConverterRule(
+    classOf[FlinkLogicalWatermarkAssigner],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.STREAM_PHYSICAL,
+    "StreamExecWatermarkAssignerRule") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val watermarkAssigner = rel.asInstanceOf[FlinkLogicalWatermarkAssigner]
+    val convertInput = RelOptRule.convert(
+      watermarkAssigner.getInput, FlinkConventions.STREAM_PHYSICAL)
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+
+    if (watermarkAssigner.rowtimeFieldIndex.isEmpty) {
+      throw new TableException("rowtimeFieldIndex should not be empty")
+    }
+
+    if (watermarkAssigner.watermarkDelay.isEmpty) {
+      throw new TableException("watermarkDelay should not be empty")
+    }
+
+    StreamExecWatermarkAssigner.createRowTimeWatermarkAssigner(
+      watermarkAssigner.getCluster,
+      traitSet,
+      convertInput,
+      watermarkAssigner.rowtimeFieldIndex.get,
+      watermarkAssigner.watermarkDelay.get)
+  }
+}
+
+object StreamExecWatermarkAssignerRule {
+  val INSTANCE = new StreamExecWatermarkAssignerRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
index 49fcc7f..984115e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala
@@ -56,17 +56,24 @@ class MiniBatchIntervalTrait(miniBatchInterval: MiniBatchInterval) extends RelTr
 }
 
 /**
-  * @param interval interval of minibatch
-  * @param mode type of minibatch: rowtime/proctime
+  * @param interval interval of mini-batch
+  * @param mode type of mini-batch: rowtime/proctime
   */
 case class MiniBatchInterval(interval: Long, mode: MiniBatchMode)
 
 object MiniBatchInterval {
+  // default none value.
   val NONE = MiniBatchInterval(0L, MiniBatchMode.None)
+  // specific for cases when there exists nodes require watermark but mini-batch interval of
+  // watermark is disabled by force, e.g. existing window aggregate.
+  // The difference between NONE AND NO_MINIBATCH is when merging with other miniBatchInterval,
+  // NONE case yields other miniBatchInterval, while NO_MINIBATCH case yields NO_MINIBATCH.
+  val NO_MINIBATCH = MiniBatchInterval(-1L, MiniBatchMode.None)
 }
 
 object MiniBatchIntervalTrait {
   val NONE = new MiniBatchIntervalTrait(MiniBatchInterval.NONE)
+  val NO_MINIBATCH = new MiniBatchIntervalTrait(MiniBatchInterval.NO_MINIBATCH)
 }
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index d5c7487..8a01174 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
 import org.apache.flink.table.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory}
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
 import org.apache.flink.table.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
+
 import com.google.common.collect.{ImmutableList, Lists}
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan.RelOptUtil.InputFinder
@@ -35,6 +37,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.util.mapping.Mappings
 import org.apache.calcite.util.{ImmutableBitSet, Pair, Util}
+import org.apache.commons.math3.util.ArithmeticUtils
 
 import java.io.{PrintWriter, StringWriter}
 import java.math.BigDecimal
@@ -721,4 +724,71 @@ object FlinkRelOptUtil {
     }
   }
 
+  /**
+    * Merge two MiniBatchInterval as a new one.
+    *
+    * The Merge Logic:  MiniBatchMode: (R: rowtime, P: proctime, N: None), I: Interval
+    * Possible values:
+    * - (R, I = 0): operators that require watermark (window excluded).
+    * - (R, I > 0): window / operators that require watermark with minibatch enabled.
+    * - (R, I = -1): existing window aggregate
+    * - (P, I > 0): unbounded agg with minibatch enabled.
+    * - (N, I = 0): no operator requires watermark, minibatch disabled
+    * ------------------------------------------------
+    * |    A        |    B        |   merged result
+    * ------------------------------------------------
+    * | R, I_1 == 0 | R, I_2      |  R, gcd(I_1, I_2)
+    * ------------------------------------------------
+    * | R, I_1 == 0 | P, I_2      |  R, I_2
+    * ------------------------------------------------
+    * | R, I_1 > 0  | R, I_2      |  R, gcd(I_1, I_2)
+    * ------------------------------------------------
+    * | R, I_1 > 0  | P, I_2      |  R, I_1
+    * ------------------------------------------------
+    * | R, I_1 = -1 | R, I_2      |  R, I_1
+    * ------------------------------------------------
+    * | R, I_1 = -1 | P, I_2      |  R, I_1
+    * ------------------------------------------------
+    * | P, I_1      | R, I_2 == 0 |  R, I_1
+    * ------------------------------------------------
+    * | P, I_1      | R, I_2 > 0  |  R, I_2
+    * ------------------------------------------------
+    * | P, I_1      | P, I_2 > 0  |  P, I_1
+    * ------------------------------------------------
+    */
+  def mergeMiniBatchInterval(
+      interval1: MiniBatchInterval,
+      interval2: MiniBatchInterval): MiniBatchInterval = {
+    if (interval1 == MiniBatchInterval.NO_MINIBATCH ||
+      interval2 == MiniBatchInterval.NO_MINIBATCH) {
+      return MiniBatchInterval.NO_MINIBATCH
+    }
+    interval1.mode match {
+      case MiniBatchMode.None => interval2
+      case MiniBatchMode.RowTime =>
+        interval2.mode match {
+          case MiniBatchMode.None => interval1
+          case MiniBatchMode.RowTime =>
+            val gcd = ArithmeticUtils.gcd(interval1.interval, interval2.interval)
+            MiniBatchInterval(gcd, MiniBatchMode.RowTime)
+          case MiniBatchMode.ProcTime =>
+            if (interval1.interval == 0) {
+              MiniBatchInterval(interval2.interval, MiniBatchMode.RowTime)
+            } else {
+              interval1
+            }
+        }
+      case MiniBatchMode.ProcTime =>
+        interval2.mode match {
+          case MiniBatchMode.None | MiniBatchMode.ProcTime => interval1
+          case MiniBatchMode.RowTime =>
+            if (interval2.interval > 0) {
+              interval2
+            } else {
+              MiniBatchInterval(interval1.interval, MiniBatchMode.RowTime)
+            }
+        }
+    }
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index 5398418..652d833 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -534,6 +534,42 @@ Sink(fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testExplainWithSingleSink[extended=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+
+== Optimized Logical Plan ==
+Sink(fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- Calc(select=[a, b, c], where=[>(a, 10)], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(where: (a > 10), select: (a, b, c))
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: TestingAppendTableSink
+					ship_strategy : FORWARD
+
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testExplainWithSort[extended=false]">
     <Resource name="explain">
       <![CDATA[== Abstract Syntax Tree ==
@@ -588,39 +624,118 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testExplainWithSingleSink[extended=true]">
+  <TestCase name="testMiniBatchIntervalInfer[extended=true]">
     <Resource name="explain">
       <![CDATA[== Abstract Syntax Tree ==
-LogicalSink(fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalFilter(condition=[>($0, 10)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+         +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+            +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+               +- LogicalJoin(condition=[true], joinType=[inner])
+                  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                  :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+         +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+            +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+               +- LogicalJoin(condition=[true], joinType=[inner])
+                  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                  :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
 
 == Optimized Logical Plan ==
-Sink(fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- Calc(select=[a, b, c], where=[>(a, 10)], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc], reuse_id=[1]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   :- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   :  +- WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   :     +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, text, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   +- Exchange(distribution=[hash[id2]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+         +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+
+Sink(fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+         +- Reused(reference_id=[1])
+
+Sink(fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+   +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+         +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
  : Data Source
 	content : collect elements with CollectionInputFormat
 
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: (a > 10), select: (a, b, c))
+			content : WatermarkAssigner(rowtime: 2, offset: 0)
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SinkConversionToRow
+				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
 				ship_strategy : FORWARD
 
-				 : Data Sink
-					content : Sink: TestingAppendTableSink
+				 : Operator
+					content : WatermarkAssigner(rowtime: 4, offset: 0)
 					ship_strategy : FORWARD
 
+					 : Operator
+						content : Co-Process
+						ship_strategy : HASH
+
+						 : Operator
+							content : Calc(select: (id1, rowtime AS ts, text))
+							ship_strategy : FORWARD
+
+							 : Operator
+								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								ship_strategy : FORWARD
+
+								 : Operator
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									ship_strategy : HASH
+
+									 : Operator
+										content : SinkConversionToRow
+										ship_strategy : FORWARD
+
+										 : Operator
+											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											ship_strategy : FORWARD
+
+											 : Operator
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												ship_strategy : HASH
+
+												 : Operator
+													content : SinkConversionToRow
+													ship_strategy : FORWARD
+
+													 : Data Sink
+														content : Sink: TestingAppendTableSink
+														ship_strategy : FORWARD
+
+														 : Data Sink
+															content : Sink: TestingAppendTableSink
+															ship_strategy : FORWARD
+
 ]]>
     </Resource>
   </TestCase>
@@ -656,4 +771,119 @@ Union(all=[true], union=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testMiniBatchIntervalInfer[extended=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+         +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+            +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+               +- LogicalJoin(condition=[true], joinType=[inner])
+                  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                  :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$0], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+         +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+            +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+               +- LogicalJoin(condition=[true], joinType=[inner])
+                  :- LogicalWatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                  :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalWatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+== Optimized Logical Plan ==
+Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
+   :- Exchange(distribution=[hash[id1]])
+   :  +- WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, text, rowtime])
+   +- Exchange(distribution=[hash[id2]])
+      +- WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+         +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+   +- Exchange(distribution=[hash[id1]])
+      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+         +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+   +- Exchange(distribution=[hash[id1]])
+      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+         +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : WatermarkAssigner(rowtime: 2, offset: 0)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+				ship_strategy : FORWARD
+
+				 : Operator
+					content : WatermarkAssigner(rowtime: 4, offset: 0)
+					ship_strategy : FORWARD
+
+					 : Operator
+						content : Co-Process
+						ship_strategy : HASH
+
+						 : Operator
+							content : Calc(select: (id1, rowtime AS ts, text))
+							ship_strategy : FORWARD
+
+							 : Operator
+								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								ship_strategy : FORWARD
+
+								 : Operator
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									ship_strategy : HASH
+
+									 : Operator
+										content : SinkConversionToRow
+										ship_strategy : FORWARD
+
+										 : Operator
+											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											ship_strategy : FORWARD
+
+											 : Operator
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												ship_strategy : HASH
+
+												 : Operator
+													content : SinkConversionToRow
+													ship_strategy : FORWARD
+
+													 : Data Sink
+														content : Sink: TestingAppendTableSink
+														ship_strategy : FORWARD
+
+														 : Data Sink
+															content : Sink: TestingAppendTableSink
+															ship_strategy : FORWARD
+
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
index 27d03a1..8e3ed14 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.xml
@@ -47,9 +47,11 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(count$0) AS frequ
          :  +- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS cnt], updateAsRetraction=[true], accMode=[AccRetract])
          :     +- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
          :        +- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
-         :           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+         :           +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+         :              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
          +- Calc(select=[cnt], updateAsRetraction=[true], accMode=[Acc])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(word, cnt)]]], fields=[word, cnt], updateAsRetraction=[true], accMode=[Acc])
+            +- WatermarkAssigner(fields=[word, cnt], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(word, cnt)]]], fields=[word, cnt], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -70,7 +72,8 @@ Calc(select=[EXPR$0], updateAsRetraction=[false], accMode=[Acc])
 +- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS EXPR$0], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
       +- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+         +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -99,7 +102,8 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(count1$0) AS freq
          +- GlobalGroupAggregate(groupBy=[word], select=[word, COUNT(count$0) AS cnt], updateAsRetraction=[true], accMode=[AccRetract])
             +- Exchange(distribution=[hash[word]], updateAsRetraction=[true], accMode=[Acc])
                +- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], updateAsRetraction=[true], accMode=[Acc])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[word, number], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(word, number)]]], fields=[word, number], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml
new file mode 100644
index 0000000..e6b7c43
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -0,0 +1,572 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testMiniBatchOnly">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- Calc(select=[b, a, c])
+         +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], miniBatchInterval=[Proctime, 1000ms])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiOperatorNeedsWatermark1">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT
+   b, COUNT(a),
+   TUMBLE_START(rt, INTERVAL '5' SECOND),
+   TUMBLE_END(rt, INTERVAL '5' SECOND)
+ FROM (
+   SELECT t1.a as a, t1.b as b, t1.rowtime as rt
+   FROM
+     LeftT as t1 JOIN RightT as t2
+   ON
+     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+     t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
+   +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+      +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+         :- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+         :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+         +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
++- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+   +- Exchange(distribution=[hash[b]])
+      +- Calc(select=[b, rowtime, a])
+         +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, b, rowtime])
+            :     +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+            :        +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, rowtime])
+                  +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+                     +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiOperatorNeedsWatermark2">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b, COUNT(a)
+ OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)
+ FROM (
+  SELECT t1.a as a, t1.b as b, t1.rt as rt
+  FROM
+  (
+    SELECT b,
+     COUNT(a) as a,
+     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
+    FROM LeftT
+    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
+  ) as t1
+  JOIN
+  (
+    SELECT b,
+     COUNT(a) as a,
+     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+    FROM RightT
+    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+  ) as t2
+  ON
+    t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+    t2.rt + INTERVAL '10' SECOND
+ )
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)])
++- LogicalJoin(condition=[AND(=($1, $4), >=($2, -($5, 5000:INTERVAL SECOND)), <=($2, +($5, 10000:INTERVAL SECOND)))], joinType=[inner])
+   :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
+   :  +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+   :     +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+   :        +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+   :           +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+   +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
+      +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+         +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+            +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[b, w0$o0 AS $1])
++- OverAggregate(partitionBy=[b], orderBy=[rt ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[b, a, rt, COUNT(a) AS w0$o0])
+   +- Exchange(distribution=[hash[b]])
+      +- Calc(select=[b, a, rt])
+         +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(a, a0), >=(rt, -(rt0, 5000:INTERVAL SECOND)), <=(rt, +(rt0, 10000:INTERVAL SECOND)))], select=[b, a, rt, b0, a0, rt0])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[b, a, w$rowtime AS rt])
+            :     +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+            :        +- Exchange(distribution=[hash[b]])
+            :           +- Calc(select=[b, rowtime, a])
+            :              +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+            :                 +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[b, a, w$rowtime AS rt])
+                  +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+                     +- Exchange(distribution=[hash[b]])
+                        +- Calc(select=[b, rowtime, a])
+                           +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+                              +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiOperatorNeedsWatermark3">
+    <Resource name="sql">
+      <![CDATA[
+  SELECT t1.a, t1.b
+  FROM (
+    SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a
+  ) as t1
+  JOIN (
+    SELECT b, COUNT(a) as a
+    FROM (
+      SELECT b, COUNT(a) as a,
+         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+      FROM RightT
+      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+    )
+    GROUP BY b
+  ) as t2
+  ON t1.a = t2.a
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+   :  +- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+   :     +- LogicalProject(a=[$0], b=[$1])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+   +- LogicalAggregate(group=[{0}], a=[COUNT()])
+      +- LogicalProject(b=[$0], a=[$2])
+         +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
+            +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+               +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b])
++- Join(joinType=[InnerJoin], where=[=(a0, a1)], select=[a, b, a0, b0, a1], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
+   :- Exchange(distribution=[hash[a0]])
+   :  +- Calc(select=[a, b, CAST(a) AS a0])
+   :     +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0])
+   :              +- Calc(select=[a, b])
+   :                 +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], miniBatchInterval=[Proctime, 6000ms])
+   :                    +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a])
+         +- Exchange(distribution=[hash[b]])
+            +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(*) AS count1$0])
+               +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS a])
+                  +- Exchange(distribution=[hash[b]])
+                     +- Calc(select=[b, rowtime, a])
+                        +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+                           +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleWindowAggregates">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$1], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+         +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
+            +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
+               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+                  +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+                     +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+                        +- LogicalJoin(condition=[true], joinType=[inner])
+                           :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+                           :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                           +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+                              +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalProject(id1=[$1], EXPR$1=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
+      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2])
+         +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+            +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+               +- LogicalJoin(condition=[true], joinType=[inner])
+                  :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+                  :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+LogicalSink(fields=[a, b])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+   +- LogicalProject(id1=[$0], text=[$1])
+      +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
+         +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
+            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+               +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+                  +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+                     +- LogicalJoin(condition=[true], joinType=[inner])
+                        :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0])
+                        :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                        +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+                           +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+
+== Optimized Logical Plan ==
+Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
++- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
+   :- Exchange(distribution=[hash[id1]])
+   :  +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+   +- Exchange(distribution=[hash[id2]])
+      +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+         +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
+
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
++- Exchange(distribution=[hash[id1]])
+   +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+      +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+   +- Exchange(distribution=[hash[id1]])
+      +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+         +- Reused(reference_id=[2])
+
+Sink(fields=[a, b])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
+   +- Exchange(distribution=[hash[id1]])
+      +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+         +- Reused(reference_id=[1])
+
+Sink(fields=[a, b])
++- GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
+   +- Exchange(distribution=[hash[id1]])
+      +- LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+         +- Calc(select=[id1, text])
+            +- Reused(reference_id=[2])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, rowtime, text))
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : WatermarkAssigner(rowtime: 1, offset: 0)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, rowtime, cnt, name, goods))
+				ship_strategy : FORWARD
+
+				 : Operator
+					content : WatermarkAssigner(rowtime: 1, offset: 0)
+					ship_strategy : FORWARD
+
+					 : Operator
+						content : Co-Process
+						ship_strategy : HASH
+
+						 : Operator
+							content : Calc(select: (id1, rowtime AS ts, text))
+							ship_strategy : FORWARD
+
+							 : Operator
+								content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+								ship_strategy : FORWARD
+
+								 : Operator
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+									ship_strategy : HASH
+
+									 : Operator
+										content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+										ship_strategy : FORWARD
+
+										 : Operator
+											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+											ship_strategy : HASH
+
+											 : Operator
+												content : SinkConversionToRow
+												ship_strategy : FORWARD
+
+												 : Operator
+													content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text))
+													ship_strategy : FORWARD
+
+													 : Operator
+														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+														ship_strategy : HASH
+
+														 : Operator
+															content : SinkConversionToRow
+															ship_strategy : FORWARD
+
+															 : Operator
+																content : Calc(select: (id1, text))
+																ship_strategy : FORWARD
+
+																 : Operator
+																	content : LocalGroupAggregate
+																	ship_strategy : FORWARD
+
+																	 : Operator
+																		content : GlobalGroupAggregate
+																		ship_strategy : HASH
+
+																		 : Operator
+																			content : SinkConversionToTuple2
+																			ship_strategy : FORWARD
+
+																			 : Operator
+																				content : Map
+																				ship_strategy : FORWARD
+
+																				 : Data Sink
+																					content : Sink: TestingAppendTableSink
+																					ship_strategy : FORWARD
+
+																					 : Data Sink
+																						content : Sink: TestingAppendTableSink
+																						ship_strategy : FORWARD
+
+																						 : Data Sink
+																							content : Sink: TestingRetractTableSink
+																							ship_strategy : FORWARD
+
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRedundantWatermarkDefinition">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- Calc(select=[b, a, c])
+         +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Proctime, 1000ms])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRowtimeRowsOverWithMiniBatch">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT cnt, COUNT(c)
+ FROM (
+   SELECT c, COUNT(a)
+   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
+   FROM MyTable3
+ )
+ GROUP BY cnt
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2])
+   +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[cnt]])
+   +- LocalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(c) AS count$0])
+      +- Calc(select=[w0$o0 AS cnt, c])
+         +- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
+            +- Exchange(distribution=[hash[c]])
+               +- Calc(select=[a, c, rowtime])
+                  +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+                     +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowJoinWithMiniBatch">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b, COUNT(a)
+ FROM (
+   SELECT t1.a as a, t1.b as b
+   FROM
+     LeftT as t1 JOIN RightT as t2
+   ON
+     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+     t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(b=[$1], a=[$0])
+   +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+      :- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+      +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+      +- Calc(select=[b, a])
+         +- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, b, rowtime])
+            :     +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+            :        +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, rowtime])
+                  +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[Rowtime, 1000ms])
+                     +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowCascade">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b,
+   SUM(cnt)
+ FROM (
+   SELECT b,
+     COUNT(a) as cnt,
+     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
+   FROM MyTable3
+   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
+ )
+ GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(b=[$0], EXPR$1=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
+   +- LogicalProject(b=[$0], $f1=[TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2])
+      +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+         +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0])
+            +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], select=[b, $SUM0(cnt) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- Calc(select=[b, w$rowtime AS $f1, cnt])
+      +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, rowtime, a])
+               +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+                  +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowWithEarlyFire">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b, SUM(cnt)
+ FROM (
+   SELECT b,
+     COUNT(a) as cnt,
+     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
+     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
+   FROM MyTable3
+   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+ )
+ GROUP BY b
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
++- LogicalProject(b=[$0], cnt=[$2])
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
+      +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
+         +- LogicalWatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0])
+            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(sum$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(cnt) AS sum$0, COUNT_RETRACT(*) AS count1$1])
+      +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 millisecond])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, rowtime, a])
+               +- WatermarkAssigner(fields=[a, b, c, proctime, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+                  +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
index caf9de2..f87775a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.xml
@@ -37,7 +37,8 @@ GlobalGroupAggregate(groupBy=[a1], select=[a1, MAX(max$0) AS EXPR$1], updateAsRe
          +- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(max$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
             +- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
                +- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(a3) AS max$0], updateAsRetraction=[true], accMode=[Acc])
-                  +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -108,7 +109,8 @@ GlobalGroupAggregate(groupBy=[a1], select=[a1, MIN_RETRACT(min$0) AS EXPR$1], up
          +- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(max$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
             +- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
                +- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MAX(a3) AS max$0], updateAsRetraction=[true], accMode=[Acc])
-                  +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -133,7 +135,8 @@ GlobalGroupAggregate(select=[MIN(min$0) AS EXPR$0], updateAsRetraction=[false],
          +- GlobalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MIN(min$0) AS a3], updateAsRetraction=[true], accMode=[AccRetract])
             +- Exchange(distribution=[hash[a1, a2]], updateAsRetraction=[true], accMode=[Acc])
                +- LocalGroupAggregate(groupBy=[a1, a2], select=[a1, a2, MIN(a3) AS min$0], updateAsRetraction=[true], accMode=[Acc])
-                  +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[a1, a2, a3], miniBatchInterval=[Proctime, 100ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- DataStreamScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
index d03ec5f..206f59f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.xml
@@ -43,8 +43,10 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0])
       +- Union(all=[true], union=[a, b, c])
-         :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-         +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         :- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -131,7 +133,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) FILTER $f2 AS sum$0, COUNT(distinct$0 c) FILTER $f4 AS count$1, COUNT(distinct$0 c) FILTER $f5 AS count$2, MAX(b) AS max$3, DISTINCT(c) AS distinct$0])
       +- Calc(select=[a, b, IS TRUE(=(c, _UTF-16LE'A':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) AS $f2, c, IS TRUE(d) AS $f4, IS TRUE(=(b, 1:BIGINT)) AS $f5])
-         +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+         +- WatermarkAssigner(fields=[a, b, c, d], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
@@ -151,7 +154,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXP
 GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -259,8 +263,10 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0])
       +- Union(all=[true], union=[a, b, c])
-         :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-         +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         :- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
index 592cb9d..e9ec761 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -40,7 +40,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM
 GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) FILTER $f2 AS EXPR$1, SUM(b) FILTER $f3 AS EXPR$2, SUM(b) FILTER $f2 AS EXPR$3])
 +- Exchange(distribution=[hash[a]])
    +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -69,7 +70,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) FILTER $f2 AS count$0, SUM(b) FILTER $f3 AS sum$1, SUM(b) FILTER $f2 AS sum$2, DISTINCT(b) AS distinct$0])
       +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -101,7 +103,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
          +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
             +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
                +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -135,7 +138,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
                +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
                      +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -169,7 +173,8 @@ GroupAggregate(groupBy=[b], select=[b, FIRST_VALUE_RETRACT(c) AS EXPR$1, LAST_VA
       +- GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS b, MAX(b) AS c], updateAsRetraction=[true], accMode=[AccRetract])
          +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
             +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -204,7 +209,8 @@ GroupAggregate(groupBy=[b], select=[b, FIRST_VALUE_RETRACT(c) AS EXPR$1, LAST_VA
          +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
             +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
                +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -242,7 +248,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
                   +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
                      +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
                         +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                           +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -282,7 +289,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
                         +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
                            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
                               +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                                 +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -313,7 +321,8 @@ GroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(b) AS EXPR$1, MAX_RETRACT(b)
 +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[AccRetract])
    +- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c], updateAsRetraction=[true], accMode=[AccRetract])
       +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -346,7 +355,8 @@ GlobalGroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(min$0) AS EXPR$1, MAX_R
       +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
          +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
             +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -382,7 +392,8 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRACT($f3
                +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
                   +- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c], updateAsRetraction=[true], accMode=[AccRetract])
                      +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -421,7 +432,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
                         +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
                            +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
                               +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
-                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                                 +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -441,7 +453,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $
 GroupAggregate(select=[COUNT(DISTINCT a) AS EXPR$0, SUM(DISTINCT b) AS EXPR$1])
 +- Exchange(distribution=[single])
    +- Calc(select=[a, b])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -462,7 +475,8 @@ GlobalGroupAggregate(select=[COUNT(distinct$0 count$0) AS EXPR$0, SUM(distinct$1
 +- Exchange(distribution=[single])
    +- LocalGroupAggregate(select=[COUNT(distinct$0 a) AS count$0, SUM(distinct$1 b) AS sum$1, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])
       +- Calc(select=[a, b])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -486,7 +500,8 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f2_0) AS $f0, SU
          +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
             +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -512,7 +527,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
                +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                      +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -543,7 +559,8 @@ GroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(b) AS EXPR$1, MAX_RETRACT(b)
 +- Exchange(distribution=[hash[c]])
    +- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c])
       +- Exchange(distribution=[hash[a]])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -576,7 +593,8 @@ GlobalGroupAggregate(groupBy=[c], select=[c, MIN_RETRACT(min$0) AS EXPR$1, MAX_R
       +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
          +- Exchange(distribution=[hash[a]])
             +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -612,7 +630,8 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRACT($f3
                +- Calc(select=[a, b, c, MOD(HASH_CODE(a), 1024) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
                   +- GroupAggregate(groupBy=[a], select=[a, AVG(b) AS b, MAX(c) AS c])
                      +- Exchange(distribution=[hash[a]])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -651,7 +670,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
                         +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
                            +- Exchange(distribution=[hash[a]])
                               +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
-                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                                 +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -670,7 +690,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -690,7 +711,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2
 GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -714,7 +736,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR
          +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -740,7 +763,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AG
                +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -760,7 +784,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
 GroupAggregate(select=[COUNT(DISTINCT c) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- Calc(select=[c])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -781,7 +806,8 @@ GlobalGroupAggregate(select=[COUNT(distinct$0 count$0) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- LocalGroupAggregate(select=[COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
       +- Calc(select=[c])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -803,7 +829,8 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0])
    +- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(DISTINCT c) AS $f1_0])
       +- Exchange(distribution=[hash[$f1]])
          +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -827,7 +854,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
          +- Exchange(distribution=[hash[$f1]])
             +- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
                +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -851,7 +879,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
 Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT c) AS EXPR$1, $SUM0(b) AS EXPR$2, COUNT(b) AS $f3, MAX(b) AS EXPR$4, MIN(b) AS EXPR$5, COUNT(*) AS EXPR$7])
    +- Exchange(distribution=[hash[a]])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -876,7 +905,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
 +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1, $SUM0(sum$1) AS EXPR$2, COUNT(count$2) AS $f3, MAX(max$3) AS EXPR$4, MIN(min$4) AS EXPR$5, COUNT(count1$5) AS EXPR$7])
    +- Exchange(distribution=[hash[a]])
       +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 c) AS count$0, $SUM0(b) AS sum$1, COUNT(b) AS count$2, MAX(b) AS max$3, MIN(b) AS min$4, COUNT(*) AS count1$5, DISTINCT(c) AS distinct$0])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -905,7 +935,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
             +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -936,7 +967,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
                   +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
                      +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
                         +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                           +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -956,7 +988,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
 GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS EXPR$1, SUM(b) AS EXPR$2, AVG(b) AS EXPR$3])
 +- Exchange(distribution=[hash[a]])
    +- Calc(select=[a, b])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -977,7 +1010,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, SUM(b) AS sum$1, AVG(b) AS (sum$2, count$3), DISTINCT(b) AS distinct$0])
       +- Calc(select=[a, b])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1002,7 +1036,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
             +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
                   +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1029,7 +1064,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
                   +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
                      +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
                         +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                           +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1049,7 +1085,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)])
 GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT c) AS EXPR$1])
 +- Exchange(distribution=[hash[a]])
    +- Calc(select=[a, c])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1070,7 +1107,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
       +- Calc(select=[a, c])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1092,7 +1130,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
    +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT c) AS $f2_0])
       +- Exchange(distribution=[hash[a, $f2]])
          +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1116,7 +1155,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
          +- Exchange(distribution=[hash[a, $f2]])
             +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
                +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1147,7 +1187,8 @@ GroupAggregate(groupBy=[a], select=[a, COUNT_RETRACT(DISTINCT b) AS EXPR$1, COUN
    +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
       +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
          +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -1180,7 +1221,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) A
          +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
             +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
                +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -1215,7 +1257,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
                +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
                   +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
                      +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -1253,7 +1296,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
                         +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
                            +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
                               +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
-                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                                 +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -1272,7 +1316,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1291,7 +1336,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1315,7 +1361,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1339,7 +1386,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1358,7 +1406,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1377,7 +1426,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1396,7 +1446,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1415,7 +1466,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1434,7 +1486,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $2
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1453,7 +1506,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1)], EXPR$2=[COUNT(DISTINCT $2
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1477,7 +1531,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1501,7 +1556,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1520,7 +1576,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1539,7 +1596,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1558,7 +1616,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1577,7 +1636,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1599,7 +1659,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($2)])
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT b) AS EXPR$1, MAX(c) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1622,7 +1683,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($2)])
 GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 b) AS count$0, MAX(c) AS max$1, DISTINCT(b) AS distinct$0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1649,7 +1711,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1671,7 +1734,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
    +- GroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(DISTINCT a) AS $f1, COUNT(b) AS $f2])
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, b])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1700,7 +1764,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
                +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1721,7 +1786,8 @@ GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1
 +- Exchange(distribution=[hash[a]])
    +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
       +- Calc(select=[a, b])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1745,7 +1811,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
          +- Exchange(distribution=[hash[a]])
             +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
                +- Calc(select=[a, b])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1765,7 +1832,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
 GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT a) AS EXPR$1, COUNT(b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- Calc(select=[a, b])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
index b4b5e82..a415121 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -45,7 +45,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
             +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
                   +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -84,7 +85,8 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
                      +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
                         +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
                            +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                              +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -122,7 +124,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
                      +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c], updateAsRetraction=[true], accMode=[AccRetract])
                         +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
                            +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2], updateAsRetraction=[true], accMode=[Acc])
-                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                              +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -147,7 +150,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0, SU
             +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                   +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -185,7 +189,8 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
                      +- GlobalGroupAggregate(groupBy=[a], select=[a, AVG((sum$0, count$1)) AS b, MAX(max$2) AS c])
                         +- Exchange(distribution=[hash[a]])
                            +- LocalGroupAggregate(groupBy=[a], select=[a, AVG(b) AS (sum$0, count$1), MAX(c) AS max$2])
-                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                              +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -210,7 +215,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AG
             +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -233,7 +239,8 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0])
       +- Exchange(distribution=[hash[$f1]])
          +- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
             +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -263,7 +270,8 @@ Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(C
                +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[null], $e=[3]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -289,7 +297,8 @@ Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
                +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
                      +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                           +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -312,7 +321,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
       +- Exchange(distribution=[hash[a, $f2]])
          +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
             +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -349,7 +359,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
                      +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
                         +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
                            +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
-                              +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                              +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms], updateAsRetraction=[true], accMode=[Acc])
+                                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -373,7 +384,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, FIRST_VALUE_RET
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -392,7 +404,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[FIRST_VALUE($1, $2)], EXPR$2=[COUNT(DISTIN
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, FIRST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -416,7 +429,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LAST_VALUE_RETR
          +- Calc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 1) AS $g_1])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -435,7 +449,8 @@ LogicalAggregate(group=[{0}], EXPR$1=[LAST_VALUE($1, $2)], EXPR$2=[COUNT(DISTINC
       <![CDATA[
 GroupAggregate(groupBy=[a], select=[a, LAST_VALUE(c, b) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -463,7 +478,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
             +- Calc(select=[a, b, c, $f3, $f4, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $f4=[$4], $e=[2]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3, MOD(HASH_CODE(c), 1024) AS $f4])
-                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                        +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -486,7 +502,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
       +- Exchange(distribution=[hash[a]])
          +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS distinct$0])
             +- Calc(select=[a, b])
-               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+                  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
index 2458103..e249131 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.xml
@@ -35,7 +35,8 @@ Calc(select=[EXPR$0])
    +- Exchange(distribution=[hash[b]])
       +- LocalGroupAggregate(groupBy=[b], select=[b, AVG(a) AS (sum$0, count$1)])
          +- Calc(select=[b, a])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -56,7 +57,8 @@ GlobalGroupAggregate(select=[AVG((sum$0, count$1)) AS EXPR$0]), rowType=[RecordT
 +- Exchange(distribution=[single]), rowType=[RecordType(DOUBLE sum$0, BIGINT count$1)]
    +- LocalGroupAggregate(select=[AVG($f0) AS (sum$0, count$1)]), rowType=[RecordType(DOUBLE sum$0, BIGINT count$1)]
       +- Calc(select=[CAST(a) AS $f0]), rowType=[RecordType(DOUBLE $f0)]
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
 ]]>
     </Resource>
   </TestCase>
@@ -79,7 +81,8 @@ Calc(select=[EXPR$0])
    +- Exchange(distribution=[hash[b]])
       +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
          +- Calc(select=[b, a])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -102,7 +105,8 @@ Calc(select=[EXPR$0, EXPR$1])
    +- Exchange(distribution=[hash[d]])
       +- LocalGroupAggregate(groupBy=[d], select=[d, MIN(c) AS min$0, AVG(a) AS (sum$1, count$2)])
          +- Calc(select=[+(b, 3) AS d, c, a])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -123,7 +127,8 @@ GlobalGroupAggregate(select=[COUNT(count$0) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- LocalGroupAggregate(select=[COUNT(a) AS count$0])
       +- Calc(select=[a])
-         +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -147,7 +152,8 @@ Calc(select=[CAST(2:BIGINT) AS b, EXPR$1])
    +- Exchange(distribution=[hash[b]])
       +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
          +- Calc(select=[CAST(2:BIGINT) AS b, a], where=[=(b, 2:BIGINT)])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -170,7 +176,8 @@ Calc(select=[4 AS four, EXPR$1])
    +- Exchange(distribution=[hash[b, four]])
       +- LocalGroupAggregate(groupBy=[b, four], select=[b, four, SUM(a) AS sum$0])
          +- Calc(select=[b, 4 AS four, a])
-            +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+               +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index d0f6460..6c10d5d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.stream
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.TableConfigOptions
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.table.util.TableTestBase
 
@@ -98,6 +98,47 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
     util.verifyExplain(extended)
   }
 
+  @Test
+  def testMiniBatchIntervalInfer(): Unit = {
+    // Test emit latency propagate among RelNodeBlocks
+    util.addDataStream[(Int, String)]("T1", 'id1, 'text, 'rowtime)
+    util.addDataStream[(Int, String, Int, String, Long)](
+      "T2", 'id2, 'cnt, 'name, 'goods, 'rowtime)
+    util.addTableWithWatermark("T3", util.tableEnv.scan("T1"), "rowtime", 0)
+    util.addTableWithWatermark("T4", util.tableEnv.scan("T2"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 3000L)
+    val table = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1, T3.rowtime AS ts, text
+        |  FROM T3, T4
+        |WHERE id1 = id2
+        |      AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
+        |      AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+      """.stripMargin)
+    util.tableEnv.registerTable("TempTable", table)
+
+    val table1 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1, CONCAT_AGG('#', text)
+        |FROM TempTable
+        |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
+      """.stripMargin)
+    val appendSink1 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+    util.tableEnv.writeToSink(table1, appendSink1)
+
+    val table2 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1, CONCAT_AGG('*', text)
+        |FROM TempTable
+        |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
+      """.stripMargin)
+    val appendSink2 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+    util.tableEnv.writeToSink(table2, appendSink2)
+
+    util.verifyExplain(extended)
+  }
+
 }
 
 object ExplainTest {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
new file mode 100644
index 0000000..66da915
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class MiniBatchIntervalInferTest extends TableTestBase {
+  private val util = streamTestUtil()
+
+  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val LONG = new BigIntType()
+  val INT = new IntType()
+
+  @Before
+  def setup(): Unit = {
+    util.addDataStream[(Int, String, Long)]("MyTable1", 'a, 'b, 'c, 'proctime, 'rowtime)
+    util.addDataStream[(Int, String, Long)]("MyTable2", 'a, 'b, 'c, 'proctime, 'rowtime)
+  }
+
+  @Test
+  def testMiniBatchOnly(): Unit = {
+    util.tableEnv.getConfig.getConf
+        .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b"
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testRedundantWatermarkDefinition(): Unit = {
+    util.tableEnv.getConfig.getConf
+        .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+    util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b"
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testWindowWithEarlyFire(): Unit = {
+    util.tableEnv.getConfig.getConf
+        .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+    util.tableEnv.getConfig.withEarlyFireInterval(Time.milliseconds(500))
+    util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    val sql =
+      """
+        | SELECT b, SUM(cnt)
+        | FROM (
+        |   SELECT b,
+        |     COUNT(a) as cnt,
+        |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
+        |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
+        |   FROM MyTable3
+        |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+        | )
+        | GROUP BY b
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testWindowCascade(): Unit = {
+    util.tableEnv.getConfig.getConf
+        .setLong(TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 3000L)
+    util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    val sql =
+      """
+        | SELECT b,
+        |   SUM(cnt)
+        | FROM (
+        |   SELECT b,
+        |     COUNT(a) as cnt,
+        |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
+        |   FROM MyTable3
+        |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
+        | )
+        | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testWindowJoinWithMiniBatch(): Unit = {
+    util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+    val sql =
+      """
+        | SELECT b, COUNT(a)
+        | FROM (
+        |   SELECT t1.a as a, t1.b as b
+        |   FROM
+        |     LeftT as t1 JOIN RightT as t2
+        |   ON
+        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+        |     t2.rowtime + INTERVAL '10' SECOND
+        | )
+        | GROUP BY b
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testRowtimeRowsOverWithMiniBatch(): Unit = {
+    util.addTableWithWatermark("MyTable3", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+    val sql =
+      """
+        | SELECT cnt, COUNT(c)
+        | FROM (
+        |   SELECT c, COUNT(a)
+        |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
+        |   FROM MyTable3
+        | )
+        | GROUP BY cnt
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  // TODO remove the exception after TableImpl implements createTemporalTableFunction
+  def testTemporalTableFunctionJoinWithMiniBatch(): Unit = {
+    util.addTableWithWatermark("Orders", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    util.addTableWithWatermark("RatesHistory", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+    util.addFunction(
+      "Rates",
+      util.tableEnv.scan("RatesHistory").createTemporalTableFunction("rowtime", "b"))
+
+    val sqlQuery =
+      """
+        | SELECT r_a, COUNT(o_a)
+        | FROM (
+        |   SELECT o.a as o_a, r.a as r_a
+        |   FROM Orders As o,
+        |   LATERAL TABLE (Rates(o.rowtime)) as r
+        |   WHERE o.b = r.b
+        | )
+        | GROUP BY r_a
+      """.stripMargin
+
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiOperatorNeedsWatermark1(): Unit = {
+    // infer result: miniBatchInterval=[Rowtime, 0ms]
+    util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
+
+    val sql =
+      """
+        | SELECT
+        |   b, COUNT(a),
+        |   TUMBLE_START(rt, INTERVAL '5' SECOND),
+        |   TUMBLE_END(rt, INTERVAL '5' SECOND)
+        | FROM (
+        |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt
+        |   FROM
+        |     LeftT as t1 JOIN RightT as t2
+        |   ON
+        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+        |     t2.rowtime + INTERVAL '10' SECOND
+        | )
+        | GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testMultiOperatorNeedsWatermark2(): Unit = {
+    util.addTableWithWatermark("LeftT", util.tableEnv.scan("MyTable1"), "rowtime", 0)
+    util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 6000L)
+
+    val sql =
+      """
+        | SELECT b, COUNT(a)
+        | OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)
+        | FROM (
+        |  SELECT t1.a as a, t1.b as b, t1.rt as rt
+        |  FROM
+        |  (
+        |    SELECT b,
+        |     COUNT(a) as a,
+        |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
+        |    FROM LeftT
+        |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
+        |  ) as t1
+        |  JOIN
+        |  (
+        |    SELECT b,
+        |     COUNT(a) as a,
+        |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+        |    FROM RightT
+        |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+        |  ) as t2
+        |  ON
+        |    t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+        |    t2.rt + INTERVAL '10' SECOND
+        | )
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testMultiOperatorNeedsWatermark3(): Unit = {
+    util.addTableWithWatermark("RightT", util.tableEnv.scan("MyTable2"), "rowtime", 0)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 6000L)
+
+    val sql =
+      """
+        |  SELECT t1.a, t1.b
+        |  FROM (
+        |    SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a
+        |  ) as t1
+        |  JOIN (
+        |    SELECT b, COUNT(a) as a
+        |    FROM (
+        |      SELECT b, COUNT(a) as a,
+        |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
+        |      FROM RightT
+        |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
+        |    )
+        |    GROUP BY b
+        |  ) as t2
+        |  ON t1.a = t2.a
+      """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  /**
+    * Test watermarkInterval trait infer among optimize block
+    */
+  @Test
+  def testMultipleWindowAggregates(): Unit = {
+    util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime, 'text)
+    util.addDataStream[(Int, Long, Int, String, String)]("T2", 'id2, 'rowtime, 'cnt, 'name, 'goods)
+    util.addTableWithWatermark("T3", util.tableEnv.scan("T1"), "rowtime", 0)
+    util.addTableWithWatermark("T4", util.tableEnv.scan("T2"), "rowtime", 0)
+
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 500L)
+    util.tableEnv.getConfig.getConf.setLong(
+      TableConfigOptions.SQL_EXEC_MINIBATCH_SIZE, 300L)
+
+    val table1 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1, T3.rowtime AS ts, text
+        |  FROM T3, T4
+        |WHERE id1 = id2
+        |      AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
+        |      AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+      """.stripMargin)
+    util.tableEnv.registerTable("TempTable1", table1)
+
+    val table2 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1,
+        |    CONCAT_AGG('#', text) as text,
+        |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
+        |FROM TempTable1
+        |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
+      """.stripMargin)
+    util.tableEnv.registerTable("TempTable2", table2)
+
+  val table3 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1,
+        |    CONCAT_AGG('*', text)
+        |FROM TempTable2
+        |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
+      """.stripMargin)
+    val appendSink1 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+    util.tableEnv.writeToSink(table3, appendSink1)
+
+    val table4 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1,
+        |    CONCAT_AGG('-', text)
+        |FROM TempTable1
+        |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
+      """.stripMargin)
+    val appendSink2 = util.createAppendTableSink(Array("a", "b"), Array(INT, STRING))
+    util.tableEnv.writeToSink(table4, appendSink2)
+
+    val table5 = util.tableEnv.sqlQuery(
+      """
+        |SELECT id1,
+        |    COUNT(text)
+        |FROM TempTable2
+        |GROUP BY id1
+      """.stripMargin)
+    val appendSink3 = util.createRetractTableSink(Array("a", "b"), Array(INT, LONG))
+    util.tableEnv.writeToSink(table5, appendSink3)
+
+    util.verifyExplain()
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
index b515b98..5125df0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
@@ -62,7 +62,6 @@ class AggregateTest extends TableTestBase {
   def testAggWithMiniBatch(): Unit = {
     util.tableEnv.getConfig.getConf.setLong(
       TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
-    // TODO supports MiniBatch
     util.verifyPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c)  FROM MyTable GROUP BY b")
   }
 
@@ -70,7 +69,6 @@ class AggregateTest extends TableTestBase {
   def testAggAfterUnionWithMiniBatch(): Unit = {
     util.tableEnv.getConfig.getConf.setLong(
       TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, 1000L)
-    // TODO supports MiniBatch
     val query =
       """
         |SELECT a, sum(b), count(distinct c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
index ed34436..94f99d4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
+import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
 import org.apache.flink.table.util.TableTestUtil
 
 import org.apache.calcite.sql.SqlExplainLevel
@@ -74,4 +75,67 @@ class FlinkRelOptUtilTest {
     assertEquals(expected2.trim, FlinkRelOptUtil.toString(rel, SqlExplainLevel.NO_ATTRIBUTES).trim)
   }
 
+  @Test
+  def testMergeRowTimeAndNone(): Unit = {
+    val none = MiniBatchInterval.NONE
+    val rowtime = MiniBatchInterval(1000L, MiniBatchMode.RowTime)
+    val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(none, rowtime)
+    assertEquals(rowtime, mergedResult)
+  }
+
+  @Test
+  def testMergeProcTimeAndNone(): Unit = {
+    val none = MiniBatchInterval.NONE
+    val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+    val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(none, proctime)
+    assertEquals(proctime, mergedResult)
+  }
+
+  @Test
+  def testMergeRowTimeTAndProcTime1(): Unit = {
+    val rowtime = MiniBatchInterval(4000L, MiniBatchMode.RowTime)
+    val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+    val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, proctime)
+    assertEquals(rowtime, mergedResult)
+  }
+
+  @Test
+  def testMergeRowTimeTAndProcTime2(): Unit = {
+    val rowtime = MiniBatchInterval(0L, MiniBatchMode.RowTime)
+    val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+    val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, proctime)
+    assertEquals(MiniBatchInterval(1000L, MiniBatchMode.RowTime), mergedResult)
+  }
+
+  @Test
+  def testMergeRowTimeAndRowtime(): Unit = {
+    val rowtime1 = MiniBatchInterval(3000L, MiniBatchMode.RowTime)
+    val rowtime2 = MiniBatchInterval(5000L, MiniBatchMode.RowTime)
+    val mergedResult = FlinkRelOptUtil.mergeMiniBatchInterval(rowtime1, rowtime2)
+    assertEquals(MiniBatchInterval(1000L, MiniBatchMode.RowTime), mergedResult)
+  }
+
+  @Test
+  def testMergeWithNoneMiniBatch(): Unit = {
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(
+        MiniBatchInterval.NO_MINIBATCH, MiniBatchInterval.NONE))
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(
+        MiniBatchInterval.NONE, MiniBatchInterval.NO_MINIBATCH))
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(
+        MiniBatchInterval.NO_MINIBATCH, MiniBatchInterval.NO_MINIBATCH))
+    val rowtime = MiniBatchInterval(3000L, MiniBatchMode.RowTime)
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(MiniBatchInterval.NO_MINIBATCH, rowtime))
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(rowtime, MiniBatchInterval.NO_MINIBATCH))
+    val proctime = MiniBatchInterval(1000L, MiniBatchMode.ProcTime)
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(MiniBatchInterval.NO_MINIBATCH, proctime))
+    assertEquals(MiniBatchInterval.NO_MINIBATCH,
+      FlinkRelOptUtil.mergeMiniBatchInterval(proctime, MiniBatchInterval.NO_MINIBATCH))
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
index b3d7f3f..ee00a17 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsA
 import org.apache.flink.table.runtime.utils.StreamingWithAggTestBase.AggMode
 import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
 import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
 import org.apache.flink.table.runtime.utils.{StreamingWithAggTestBase, TestData, TestingRetractSink}
 import org.apache.flink.table.typeutils.BigDecimalTypeInfo
@@ -558,6 +559,40 @@ class AggregateITCase(
   }
 
   @Test
+  def testWindowWithUnboundedAgg(): Unit = {
+    val t = failingDataSource(TestData.tupleData5.map {
+      case (a, b, c, d, e) => (b, a, c, d, e)
+    }).assignTimestampsAndWatermarks(
+      new TimestampAndWatermarkWithOffset[(Long, Int, Int, String, Long)](0L))
+        .toTable(tEnv, 'rowtime, 'a, 'c, 'd, 'e)
+    tEnv.registerTable("MyTable", t)
+    val sourceTable = tEnv.scan("MyTable")
+    addTableWithWatermark("MyTable1", sourceTable, "rowtime", 0)
+
+    val innerSql =
+      """
+        |SELECT a,
+        |   SUM(DISTINCT e) b,
+        |   MIN(DISTINCT e) c,
+        |   COUNT(DISTINCT e) d
+        |FROM MyTable1
+        |GROUP BY a, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+      """.stripMargin
+
+    val sqlQuery = "SELECT c, MAX(a), COUNT(DISTINCT d) FROM (" + innerSql + ") GROUP BY c"
+
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    results.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,5,3",
+      "2,5,2")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
   def testConcatAggWithNullData(): Unit = {
     val dataWithNull = List(
       (1, 1, null),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
index 1335e89..ea617c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
@@ -20,7 +20,11 @@ package org.apache.flink.table.runtime.utils
 
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{Table, TableException}
 import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.table.operations.PlannerQueryOperation
+import org.apache.flink.table.plan.nodes.calcite.LogicalWatermarkAssigner
+import org.apache.flink.table.util.TableTestUtil
 import org.apache.flink.test.util.AbstractTestBase
 
 import org.junit.rules.{ExpectedException, TemporaryFolder}
@@ -53,4 +57,24 @@ class StreamingTestBase extends AbstractTestBase {
     this.tEnv = StreamTableEnvironment.create(env)
   }
 
+  def addTableWithWatermark(
+      tableName: String,
+      sourceTable: Table,
+      rowtimeField: String,
+      offset: Long): Unit = {
+    val sourceRel = TableTestUtil.toRelNode(sourceTable)
+    val rowtimeFieldIdx = sourceRel.getRowType.getFieldNames.indexOf(rowtimeField)
+    if (rowtimeFieldIdx < 0) {
+      throw new TableException(s"$rowtimeField does not exist, please check it")
+    }
+    val watermarkAssigner = new LogicalWatermarkAssigner(
+      sourceRel.getCluster,
+      sourceRel.getTraitSet,
+      sourceRel,
+      Some(rowtimeFieldIdx),
+      Option(offset)
+    )
+    val queryOperation = new PlannerQueryOperation(watermarkAssigner)
+    tEnv.registerTable(tableName, tEnv.createTable(queryOperation))
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 51d16ce..7380fb7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -32,7 +32,8 @@ import org.apache.flink.table.calcite.CalciteConfig
 import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
-import org.apache.flink.table.operations.RichTableSourceQueryOperation
+import org.apache.flink.table.operations.{PlannerQueryOperation, RichTableSourceQueryOperation}
+import org.apache.flink.table.plan.nodes.calcite.LogicalWatermarkAssigner
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram}
 import org.apache.flink.table.plan.stats.FlinkStatistic
@@ -531,6 +532,35 @@ case class StreamTableTestUtil(
     tableEnv.scan(name)
   }
 
+  /**
+    * Register a table with specific row time field and offset.
+    *
+    * @param tableName table name
+    * @param sourceTable table to register
+    * @param rowtimeField row time field
+    * @param offset offset to the row time field value
+    */
+  def addTableWithWatermark(
+      tableName: String,
+      sourceTable: Table,
+      rowtimeField: String,
+      offset: Long): Unit = {
+    val sourceRel = TableTestUtil.toRelNode(sourceTable)
+    val rowtimeFieldIdx = sourceRel.getRowType.getFieldNames.indexOf(rowtimeField)
+    if (rowtimeFieldIdx < 0) {
+      throw new TableException(s"$rowtimeField does not exist, please check it")
+    }
+    val watermarkAssigner = new LogicalWatermarkAssigner(
+      sourceRel.getCluster,
+      sourceRel.getTraitSet,
+      sourceRel,
+      Some(rowtimeFieldIdx),
+      Option(offset)
+    )
+    val queryOperation = new PlannerQueryOperation(watermarkAssigner)
+    tableEnv.registerTable(tableName, tableEnv.createTable(queryOperation))
+  }
+
   def verifyPlanWithTrait(): Unit = {
     doVerifyPlan(
       SqlExplainLevel.EXPPLAN_ATTRIBUTES,
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 2f4b262..c966c0b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -38,6 +38,14 @@ public class TableConfigOptions {
 					.withDescription("Whether support values source input. The reason for disabling this " +
 									"feature is that checkpoint will not work properly when source finished.");
 
+	public static final ConfigOption<Long> SQL_EXEC_SOURCE_IDLE_TIMEOUT =
+			key("sql.exec.source.idle.timeout.ms")
+					.defaultValue(-1L)
+					.withDescription("When a source do not receive any elements for the timeout time, " +
+							"it will be marked as temporarily idle. This allows downstream " +
+							"tasks to advance their watermarks without the need to wait for " +
+							"watermarks from this source while it is idle.");
+
 	// ------------------------------------------------------------------------
 	//  Sort Options
 	// ------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java
new file mode 100644
index 0000000..8d2e5e2
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+
+/**
+ * A stream operator that emits mini-batch marker in a given period.
+ * NOTE: the mini-batch marker uses watermark instead
+ */
+public class MiniBatchAssignerOperator extends AbstractStreamOperator<BaseRow>
+	implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long intervalMs;
+
+	private transient long currentWatermark;
+
+	public MiniBatchAssignerOperator(long intervalMs) {
+		this.intervalMs = intervalMs;
+		this.chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		currentWatermark = 0;
+
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		getProcessingTimeService().registerTimer(now + intervalMs, this);
+
+		// report marker metric
+		getRuntimeContext().getMetricGroup().gauge("currentBatch", (Gauge<Long>) () -> currentWatermark);
+	}
+
+	@Override
+	public void processElement(StreamRecord<BaseRow> element) throws Exception {
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		long currentBatch = now - now % intervalMs;
+		if (currentBatch > currentWatermark) {
+			currentWatermark = currentBatch;
+			// emit
+			output.emitWatermark(new Watermark(currentBatch));
+		}
+		output.collect(element);
+	}
+
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		long currentBatch = now - now % intervalMs;
+		if (currentBatch > currentWatermark) {
+			currentWatermark = currentBatch;
+			// emit
+			output.emitWatermark(new Watermark(currentBatch));
+		}
+		getProcessingTimeService().registerTimer(currentBatch + intervalMs, this);
+	}
+
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+	 * watermarks from here).
+	 */
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
+		// to signal the end of input and to not block watermark progress downstream
+		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+			currentWatermark = Long.MAX_VALUE;
+			output.emitWatermark(mark);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java
new file mode 100644
index 0000000..c45235a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A stream operator that extracts timestamps from stream elements and
+ * generates watermarks with specified emit latency.
+ */
+public class MiniBatchedWatermarkAssignerOperator
+	extends AbstractStreamOperator<BaseRow>
+	implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+	private final int rowtimeFieldIndex;
+
+	private final long watermarkDelay;
+
+	// timezone offset.
+	private final long tzOffset;
+
+	private final long idleTimeout;
+
+	private long watermarkInterval;
+
+	private transient long currentWatermark;
+
+	private transient long expectedWatermark;
+
+	private transient long lastRecordTime;
+
+	private transient StreamStatusMaintainer streamStatusMaintainer;
+
+	public MiniBatchedWatermarkAssignerOperator(
+		int rowtimeFieldIndex,
+		long watermarkDelay,
+		long tzOffset,
+		long idleTimeout,
+		long watermarkInterval) {
+		this.rowtimeFieldIndex = rowtimeFieldIndex;
+		this.watermarkDelay = watermarkDelay;
+		this.tzOffset = tzOffset;
+		this.chainingStrategy = ChainingStrategy.ALWAYS;
+		this.watermarkInterval = watermarkInterval;
+
+		if (idleTimeout != -1) {
+			Preconditions.checkArgument(
+				idleTimeout >= 1,
+				"The idle timeout cannot be smaller than 1 ms.");
+		}
+		this.idleTimeout = idleTimeout;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		Preconditions.checkArgument(watermarkInterval > 0,
+			"The inferred emit latency should be larger than 0");
+
+		// timezone watermarkDelay should be considered when calculating watermark start time.
+		currentWatermark = 0;
+		expectedWatermark = getMiniBatchStart(currentWatermark, tzOffset, watermarkInterval)
+			+ watermarkInterval - 1;
+
+		if (idleTimeout >= 1) {
+			this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+			this.streamStatusMaintainer = getContainingTask().getStreamStatusMaintainer();
+			getProcessingTimeService().registerTimer(lastRecordTime + idleTimeout, this);
+		}
+	}
+
+	@Override
+	public void processElement(StreamRecord<BaseRow> element) throws Exception {
+		if (idleTimeout != -1) {
+			// mark the channel active
+			streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+			lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+		}
+		BaseRow row = element.getValue();
+		if (row.isNullAt(rowtimeFieldIndex)) {
+			throw new RuntimeException("RowTime field should not be null," +
+				" please convert it to a non-null long value.");
+		}
+		long wm = row.getLong(rowtimeFieldIndex) - watermarkDelay;
+		currentWatermark = Math.max(currentWatermark, wm);
+		// forward element
+		output.collect(element);
+
+		if (currentWatermark >= expectedWatermark) {
+			output.emitWatermark(new Watermark(currentWatermark));
+			long start = getMiniBatchStart(currentWatermark, tzOffset, watermarkInterval);
+			long end = start + watermarkInterval - 1;
+			expectedWatermark = end > currentWatermark ? end : end + watermarkInterval;
+		}
+	}
+
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		if (idleTimeout != -1) {
+			final long currentTime = getProcessingTimeService().getCurrentProcessingTime();
+			if (currentTime - lastRecordTime > idleTimeout) {
+				// mark the channel as idle to ignore watermarks from this channel
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
+			}
+		}
+
+		// register next timer
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+	}
+
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link MiniBatchedWatermarkAssignerOperator} to emit
+	 * watermarks from here).
+	 */
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
+		// to signal the end of input and to not block watermark progress downstream
+		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+			if (idleTimeout != -1) {
+				// mark the channel active
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+			}
+			currentWatermark = Long.MAX_VALUE;
+			output.emitWatermark(mark);
+		}
+	}
+
+	public void endInput() throws Exception {
+		processWatermark(Watermark.MAX_WATERMARK);
+	}
+
+	@Override
+	public void close() throws Exception {
+		endInput(); // TODO after introduce endInput
+		super.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	/**
+	 * Method to get the mini-batch start for a watermark.
+	 */
+	public static long getMiniBatchStart(long watermark, long tzOffset, long interval) {
+		return watermark - (watermark - tzOffset + interval) % interval;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java
new file mode 100644
index 0000000..ca3c0c5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A stream operator that extracts timestamps from stream elements and
+ * generates periodic watermarks.
+ */
+public class WatermarkAssignerOperator
+	extends AbstractStreamOperator<BaseRow>
+	implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int rowtimeFieldIndex;
+
+	private final long watermarkDelay;
+
+	private final long idleTimeout;
+
+	private transient long watermarkInterval;
+
+	private transient long currentWatermark;
+
+	private transient long currentMaxTimestamp;
+
+	private transient long lastRecordTime;
+
+	private transient StreamStatusMaintainer streamStatusMaintainer;
+
+	/**
+	 * Create a watermark assigner operator.
+	 * @param rowtimeFieldIndex  the field index to extract event timestamp
+	 * @param watermarkDelay    the delay by which watermarks are behind the maximum observed timestamp.
+	 * @param idleTimeout   (-1 if idleness checking is disabled)
+	 */
+	public WatermarkAssignerOperator(int rowtimeFieldIndex, long watermarkDelay, long idleTimeout) {
+		this.rowtimeFieldIndex = rowtimeFieldIndex;
+		this.watermarkDelay = watermarkDelay;
+
+		if (idleTimeout != -1) {
+			Preconditions.checkArgument(
+				idleTimeout >= 1,
+				"The idle timeout cannot be smaller than 1 ms.");
+		}
+
+		this.idleTimeout = idleTimeout;
+		this.chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		// watermark and timestamp should start from 0
+		this.currentWatermark = 0;
+		this.currentMaxTimestamp = 0;
+		this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
+		this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+		this.streamStatusMaintainer = getContainingTask().getStreamStatusMaintainer();
+
+		if (watermarkInterval > 0) {
+			long now = getProcessingTimeService().getCurrentProcessingTime();
+			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+		}
+	}
+
+	@Override
+	public void processElement(StreamRecord<BaseRow> element) throws Exception {
+		if (idleTimeout != -1) {
+			// mark the channel active
+			streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+			lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
+		}
+		BaseRow row = element.getValue();
+		if (row.isNullAt(rowtimeFieldIndex)) {
+			throw new RuntimeException("RowTime field should not be null," +
+					" please convert it to a non-null long value.");
+		}
+		long ts = row.getLong(rowtimeFieldIndex);
+		currentMaxTimestamp = Math.max(currentMaxTimestamp, ts);
+		// forward element
+		output.collect(element);
+
+		// eagerly emit watermark to avoid period timer not called
+		// current_ts - last_ts > interval
+		if (currentMaxTimestamp - (currentWatermark + watermarkDelay) > watermarkInterval) {
+			advanceWatermark();
+		}
+	}
+
+	private void advanceWatermark() {
+		long newWatermark = currentMaxTimestamp - watermarkDelay;
+		if (newWatermark > currentWatermark) {
+			currentWatermark = newWatermark;
+			// emit watermark
+			output.emitWatermark(new Watermark(newWatermark));
+		}
+	}
+
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		advanceWatermark();
+
+		if (idleTimeout != -1) {
+			final long currentTime = getProcessingTimeService().getCurrentProcessingTime();
+			if (currentTime - lastRecordTime > idleTimeout) {
+				// mark the channel as idle to ignore watermarks from this channel
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
+			}
+		}
+
+		// register next timer
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
+	}
+
+	/**
+	 * Override the base implementation to completely ignore watermarks propagated from
+	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
+	 * watermarks from here).
+	 */
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
+		// to signal the end of input and to not block watermark progress downstream
+		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
+			if (idleTimeout != -1) {
+				// mark the channel active
+				streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+			}
+			currentWatermark = Long.MAX_VALUE;
+			output.emitWatermark(mark);
+		}
+	}
+
+	public void endInput() throws Exception {
+		processWatermark(Watermark.MAX_WATERMARK);
+	}
+
+	@Override
+	public void close() throws Exception {
+		endInput(); // TODO after introduce endInput
+		super.close();
+
+		// emit a final watermark
+		advanceWatermark();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java
new file mode 100644
index 0000000..e653ba3
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchAssignerOperatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests of {@link MiniBatchAssignerOperator}.
+ */
+public class MiniBatchAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+	@Test
+	public void testMiniBatchAssignerOperator() throws Exception {
+		final MiniBatchAssignerOperator operator = new MiniBatchAssignerOperator(100);
+
+		OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long currentTime = 0;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+		{
+			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+			long currentElement = 1L;
+			long lastWatermark = 0L;
+
+			while (true) {
+				if (output.size() > 0) {
+					Object next = output.poll();
+					assertNotNull(next);
+					Tuple2<Long, Long> update = validateElement(next, currentElement, lastWatermark);
+					long nextElementValue = update.f0;
+					lastWatermark = update.f1;
+					if (next instanceof Watermark) {
+						assertEquals(100, lastWatermark);
+						break;
+					} else {
+						assertEquals(currentElement, nextElementValue - 1);
+						currentElement += 1;
+						assertEquals(0, lastWatermark);
+					}
+				} else {
+					currentTime = currentTime + 10;
+					testHarness.setProcessingTime(currentTime);
+				}
+			}
+
+			output.clear();
+		}
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+		{
+			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+			long currentElement = 4L;
+			long lastWatermark = 100L;
+
+			while (true) {
+				if (output.size() > 0) {
+					Object next = output.poll();
+					assertNotNull(next);
+					Tuple2<Long, Long> update = validateElement(next, currentElement, lastWatermark);
+					long nextElementValue = update.f0;
+					lastWatermark = update.f1;
+					if (next instanceof Watermark) {
+						assertEquals(200, lastWatermark);
+						break;
+					} else {
+						assertEquals(currentElement, nextElementValue - 1);
+						currentElement += 1;
+						assertEquals(100, lastWatermark);
+					}
+				} else {
+					currentTime = currentTime + 10;
+					testHarness.setProcessingTime(currentTime);
+				}
+			}
+
+			output.clear();
+		}
+
+		testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
+		assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
+	}
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java
new file mode 100644
index 0000000..4965fe3
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/MiniBatchedWatermarkAssignerOperatorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests of {@link MiniBatchedWatermarkAssignerOperator}.
+ */
+public class MiniBatchedWatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+	@Test
+	public void testMiniBatchedWatermarkAssignerWithIdleSource() throws Exception {
+		// with timeout 1000 ms
+		final MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(
+				0, 1, 0, 1000, 50);
+		OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+		// this watermark excess expected watermark, should emit a watermark of 49
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(50L)));
+
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+		List<Watermark> watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(49), watermarks.get(0));
+		assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+		output.clear();
+
+		testHarness.setProcessingTime(1001);
+		assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(51L)));
+		assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+
+		// process time will not trigger to emit watermark
+		testHarness.setProcessingTime(1060);
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertTrue(watermarks.isEmpty());
+		output.clear();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(100L)));
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(99), watermarks.get(0));
+	}
+
+	@Test
+	public void testMiniBatchedWatermarkAssignerOperator() throws Exception {
+		final MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(0, 1, 0, -1, 50);
+
+		OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+		// this watermark excess expected watermark, should emit a watermark of 49
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(50L)));
+
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+		List<Watermark> watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(49), watermarks.get(0));
+		output.clear();
+
+		testHarness.setProcessingTime(1001);
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertTrue(watermarks.isEmpty());
+		output.clear();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(99L)));
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertTrue(watermarks.isEmpty());
+		output.clear();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(100L)));
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(99), watermarks.get(0));
+		output.clear();
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java
new file mode 100644
index 0000000..9e5347b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests of {@link WatermarkAssignerOperator}.
+ */
+public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
+
+	@Test
+	public void testWatermarkAssignerWithIdleSource() throws Exception {
+		// with timeout 1000 ms
+		final WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1, 1000);
+		OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+		// trigger watermark emit
+		testHarness.setProcessingTime(51);
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+		List<Watermark> watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(3), watermarks.get(0));
+		assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+		output.clear();
+
+		testHarness.setProcessingTime(1001);
+		assertEquals(StreamStatus.IDLE, testHarness.getStreamStatus());
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+		assertEquals(StreamStatus.ACTIVE, testHarness.getStreamStatus());
+		testHarness.setProcessingTime(1060);
+		output = testHarness.getOutput();
+		watermarks = extractWatermarks(output);
+		assertEquals(1, watermarks.size());
+		assertEquals(new Watermark(7), watermarks.get(0));
+	}
+
+	@Test
+	public void testWatermarkAssignerOperator() throws Exception {
+		final WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1, -1);
+
+		OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+		long currentTime = 0;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(1L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(2L)));
+		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(3L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+
+		// validate first part of the sequence. we poll elements until our
+		// watermark updates to "3", which must be the result of the "4" element.
+		{
+			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+			long nextElementValue = 1L;
+			long lastWatermark = -1L;
+
+			while (lastWatermark < 3) {
+				if (output.size() > 0) {
+					Object next = output.poll();
+					assertNotNull(next);
+					Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
+					nextElementValue = update.f0;
+					lastWatermark = update.f1;
+
+					// check the invariant
+					assertTrue(lastWatermark < nextElementValue);
+				} else {
+					currentTime = currentTime + 10;
+					testHarness.setProcessingTime(currentTime);
+				}
+			}
+
+			output.clear();
+		}
+
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(4L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(5L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(6L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(7L)));
+		testHarness.processElement(new StreamRecord<>(GenericRow.of(8L)));
+
+		// validate the next part of the sequence. we poll elements until our
+		// watermark updates to "7", which must be the result of the "8" element.
+		{
+			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+			long nextElementValue = 4L;
+			long lastWatermark = 2L;
+
+			while (lastWatermark < 7) {
+				if (output.size() > 0) {
+					Object next = output.poll();
+					assertNotNull(next);
+					Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
+					nextElementValue = update.f0;
+					lastWatermark = update.f1;
+
+					// check the invariant
+					assertTrue(lastWatermark < nextElementValue);
+				} else {
+					currentTime = currentTime + 10;
+					testHarness.setProcessingTime(currentTime);
+				}
+			}
+
+			output.clear();
+		}
+
+		testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
+		assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
+	}
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java
new file mode 100644
index 0000000..f29b4af
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/watermarkassigner/WatermarkAssignerOperatorTestBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.watermarkassigner;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Base class for watermark assigner operator test.
+ */
+public class WatermarkAssignerOperatorTestBase {
+
+	protected Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
+		if (element instanceof StreamRecord) {
+			@SuppressWarnings("unchecked")
+			StreamRecord<BaseRow> record = (StreamRecord<BaseRow>) element;
+			assertEquals(nextElementValue, record.getValue().getLong(0));
+			return new Tuple2<>(nextElementValue + 1, currentWatermark);
+		}
+		else if (element instanceof Watermark) {
+			long wt = ((Watermark) element).getTimestamp();
+			assertTrue(wt > currentWatermark);
+			return new Tuple2<>(nextElementValue, wt);
+		}
+		else {
+			throw new IllegalArgumentException("unrecognized element: " + element);
+		}
+	}
+
+	protected List<Watermark> extractWatermarks(Collection<Object> collection) {
+		List<Watermark> watermarks = new ArrayList<>();
+		for (Object obj : collection) {
+			if (obj instanceof Watermark) {
+				watermarks.add((Watermark) obj);
+			}
+		}
+		return watermarks;
+	}
+}