You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/05 04:31:50 UTC

[flink] branch master updated: [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94b2388  [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038)
94b2388 is described below

commit 94b23885ca34927e37334fce51b930933cfd79dd
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Aug 5 12:30:38 2020 +0800

    [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038)
    
    This closes #13038
---
 .../kafka/table/KafkaChangelogTableITCase.java     |  63 ++--
 .../stream/MiniBatchIntervalInferRule.scala        |   5 +-
 .../factories/TestValuesRuntimeFunctions.java      |  48 +--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 374 +++++++++++----------
 .../stream/sql/MiniBatchIntervalInferTest.scala    | 117 +++++--
 5 files changed, 352 insertions(+), 255 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 7b9b069..0ac07ae 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
@@ -28,9 +29,9 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -40,12 +41,13 @@ import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 /**
  * IT cases for Kafka with changelog format for Table API & SQL.
@@ -57,6 +59,7 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
 
 	@Before
 	public void setup() {
+		TestValuesTableFactory.clearAllData();
 		env = StreamExecutionEnvironment.getExecutionEnvironment();
 		tEnv = StreamTableEnvironment.create(
 			env,
@@ -77,6 +80,13 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
 		final String topic = "changelog_topic";
 		createTestTopic(topic, 1, 1);
 
+		// enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
+		Configuration tableConf = tEnv.getConfig().getConfiguration();
+		tableConf.setString("table.exec.mini-batch.enabled", "true");
+		tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
+		tableConf.setString("table.exec.mini-batch.size", "5000");
+		tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+
 		// ---------- Write the Debezium json into Kafka -------------------
 		List<String> lines = readLines("debezium-data-schema-exclude.txt");
 		DataStreamSource<String> stream = env.fromCollection(lines);
@@ -116,24 +126,12 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
 			" PRIMARY KEY (name) NOT ENFORCED" +
 			") WITH (" +
 			" 'connector' = 'values'," +
-			" 'sink-insert-only' = 'false'," +
-			" 'sink-expected-messages-num' = '20'" +
+			" 'sink-insert-only' = 'false'" +
 			")";
 		tEnv.executeSql(sourceDDL);
 		tEnv.executeSql(sinkDDL);
-
-		try {
-			TableEnvUtil.execInsertSqlAndWaitResult(
-				tEnv,
-				"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
-		} catch (Throwable t) {
-			// we have to use a specific exception to indicate the job is finished,
-			// because the registered Kafka source is infinite.
-			if (!isCausedByJobFinished(t)) {
-				// re-throw
-				throw t;
-			}
-		}
+		TableResult tableResult = tEnv.executeSql(
+			"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
 
 		// Debezium captures change data on the `products` table:
 		//
@@ -179,15 +177,15 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
 		// | 110 | jacket             | new water resistent white wind breaker                  |    0.5 |
 		// +-----+--------------------+---------------------------------------------------------+--------+
 
-		String[] expected = new String[]{
+		List<String> expected = Arrays.asList(
 			"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800",
-			"hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
+			"hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200");
 
-		List<String> actual = TestValuesTableFactory.getResults("sink");
-		assertThat(actual, containsInAnyOrder(expected));
+		waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
 
 		// ------------- cleanup -------------------
 
+		tableResult.getJobClient().get().cancel().get(); // stop the job
 		deleteTestTopic(topic);
 	}
 
@@ -201,4 +199,23 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
 		Path path = new File(url.getFile()).toPath();
 		return Files.readAllLines(path);
 	}
+
+	private static void waitingExpectedResults(String sinkName, List<String> expected, Duration timeout) throws InterruptedException {
+		long now = System.currentTimeMillis();
+		long stop = now + timeout.toMillis();
+		Collections.sort(expected);
+		while (System.currentTimeMillis() < stop) {
+			List<String> actual = TestValuesTableFactory.getResults(sinkName);
+			Collections.sort(actual);
+			if (expected.equals(actual)) {
+				return;
+			}
+			Thread.sleep(100);
+		}
+
+		// timeout, assert again
+		List<String> actual = TestValuesTableFactory.getResults(sinkName);
+		Collections.sort(actual);
+		assertEquals(expected, actual);
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
index 3971744..714b249 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
-import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecMiniBatchAssigner, StreamExecLegacyTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
+import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecLegacyTableSourceScan, StreamExecMiniBatchAssigner, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.hep.HepRelVertex
@@ -122,7 +122,8 @@ class MiniBatchIntervalInferRule extends RelOptRule(
       .getMiniBatchInterval
       .mode
     node match {
-      case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan =>
+      case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
+           _: StreamExecTableSourceScan =>
         // append minibatch node if the mode is not NONE and reach a source leaf node
         mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
       case _: StreamExecWatermarkAssigner  =>
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index b2cc256..e4ede00 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -65,40 +65,42 @@ final class TestValuesRuntimeFunctions {
 
 	static List<String> getRawResults(String tableName) {
 		List<String> result = new ArrayList<>();
-		if (globalRawResult.containsKey(tableName)) {
-			globalRawResult.get(tableName)
-				.values()
-				.forEach(result::addAll);
-		} else {
-			throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'.");
+		synchronized (TestValuesTableFactory.class) {
+			if (globalRawResult.containsKey(tableName)) {
+				globalRawResult.get(tableName)
+					.values()
+					.forEach(result::addAll);
+			}
 		}
 		return result;
 	}
 
 	static List<String> getResults(String tableName) {
 		List<String> result = new ArrayList<>();
-		if (globalUpsertResult.containsKey(tableName)) {
-			globalUpsertResult.get(tableName)
-				.values()
-				.forEach(map -> result.addAll(map.values()));
-		} else if (globalRetractResult.containsKey(tableName)) {
-			globalRetractResult.get(tableName)
-				.values()
-				.forEach(result::addAll);
-		} else if (globalRawResult.containsKey(tableName)) {
-			getRawResults(tableName).stream()
-				.map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper
-				.forEach(result::add);
-		} else {
-			throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'.");
+		synchronized (TestValuesTableFactory.class) {
+			if (globalUpsertResult.containsKey(tableName)) {
+				globalUpsertResult.get(tableName)
+					.values()
+					.forEach(map -> result.addAll(map.values()));
+			} else if (globalRetractResult.containsKey(tableName)) {
+				globalRetractResult.get(tableName)
+					.values()
+					.forEach(result::addAll);
+			} else if (globalRawResult.containsKey(tableName)) {
+				getRawResults(tableName).stream()
+					.map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper
+					.forEach(result::add);
+			}
 		}
 		return result;
 	}
 
 	static void clearResults() {
-		globalRawResult.clear();
-		globalUpsertResult.clear();
-		globalRetractResult.clear();
+		synchronized (TestValuesTableFactory.class) {
+			globalRawResult.clear();
+			globalUpsertResult.clear();
+			globalRetractResult.clear();
+		}
 	}
 
 	// ------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 124fad3..8456d01 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -16,6 +16,56 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testIntervalJoinWithMiniBatch">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b, COUNT(a)
+ FROM (
+   SELECT t1.a as a, t1.b as b
+   FROM
+     wmTable1 as t1 JOIN wmTable2 as t2
+   ON
+     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+     t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(b=[$1], a=[$0])
+   +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+            +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+      +- Calc(select=[b, a])
+         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, b, rowtime])
+            :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+            :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+            :           +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+            :              +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, rowtime])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                        +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testMiniBatchOnDataStreamWithRowTime">
     <Resource name="sql">
       <![CDATA[
@@ -60,9 +110,30 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXP
 GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+         +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[b, a, c]]], fields=[b, a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMiniBatchOnlyForDataStream">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
       +- Calc(select=[b, a, c])
          +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-            +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -76,7 +147,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
  FROM (
    SELECT t1.a as a, t1.b as b, t1.rowtime as rt
    FROM
-     LeftT as t1 JOIN RightT as t2
+     wmTable1 as t1 JOIN wmTable2 as t2
    ON
      t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
      t2.rowtime + INTERVAL '10' SECOND
@@ -90,10 +161,12 @@ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
 +- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
    +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
       +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
-         :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+         :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+               +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -105,12 +178,14 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
          +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
             :- Exchange(distribution=[hash[a]])
             :  +- Calc(select=[a, b, rowtime])
-            :     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :        +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :     +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+            :        +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+            :           +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
             +- Exchange(distribution=[hash[a]])
                +- Calc(select=[a, rowtime])
-                  +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                     +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                     +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -126,7 +201,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
     SELECT b,
      COUNT(a) as a,
      TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
-    FROM LeftT
+    FROM wmTable1
     GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
   ) as t1
   JOIN
@@ -134,7 +209,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
     SELECT b,
      COUNT(a) as a,
      HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-    FROM RightT
+    FROM wmTable2
     GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
   ) as t2
   ON
@@ -150,13 +225,15 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS
    :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
    :  +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
    :     +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
-   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-   :           +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+   :              +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
    +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
       +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
          +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
-            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -171,15 +248,17 @@ Calc(select=[b, w0$o0 AS $1])
             :     +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             :        +- Exchange(distribution=[hash[b]])
             :           +- Calc(select=[b, rowtime, a])
-            :              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :                 +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :              +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+            :                 +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+            :                    +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
             +- Exchange(distribution=[hash[a]])
                +- Calc(select=[b, a, w$rowtime AS rt])
                   +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
                      +- Exchange(distribution=[hash[b]])
                         +- Calc(select=[b, rowtime, a])
-                           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                              +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                           +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                              +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                                 +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -195,7 +274,7 @@ Calc(select=[b, w0$o0 AS $1])
     FROM (
       SELECT b, COUNT(a) as a,
          HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-      FROM RightT
+      FROM wmTable1
       GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
     )
     GROUP BY b
@@ -215,8 +294,9 @@ LogicalProject(a=[$0], b=[$1])
       +- LogicalProject(b=[$0], a=[$2])
          +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
             +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-                  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -228,9 +308,8 @@ Calc(select=[a, b])
    :     +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
    :        +- Exchange(distribution=[hash[a]])
    :           +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0])
-   :              +- Calc(select=[a, b])
-   :                 +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
-   :                    +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+   :              +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
+   :                 +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a, b]]], fields=[a, b])
    +- Exchange(distribution=[hash[a]])
       +- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -238,8 +317,9 @@ Calc(select=[a, b])
                +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS a])
                   +- Exchange(distribution=[hash[b]])
                      +- Calc(select=[b, rowtime, a])
-                        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                           +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                        +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                           +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                              +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -254,9 +334,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fie
             +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                   +- LogicalJoin(condition=[true], joinType=[inner])
-                     :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                     :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                      :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-                     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                         +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
 
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b])
@@ -265,9 +345,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fie
       +- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
             +- LogicalJoin(condition=[true], joinType=[inner])
-               :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+               :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
 
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, b])
@@ -277,19 +357,19 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fie
          +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
-                  :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                  :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                   :  +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-                  +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                      +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
 
 == Optimized Logical Plan ==
 IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], reuse_id=[1])
 :- Exchange(distribution=[hash[id1]])
-:  +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-:     +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+:  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+:     +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
 +- Exchange(distribution=[hash[id2]])
-   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-      +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
 Exchange(distribution=[hash[id1]], reuse_id=[2])
 +- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
@@ -317,45 +397,49 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
 
 == Physical Execution Plan ==
  : Data Source
-	content : Source: Collection Source
+	content : Source: TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text])
+
+	 : Operator
+		content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+		ship_strategy : FORWARD
 
  : Data Source
-	content : Source: Collection Source
+	content : Source: TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
 	 : Operator
-		content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, rowtime, text])
+		content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)])
-			ship_strategy : FORWARD
+			content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
+			ship_strategy : HASH
 
 			 : Operator
-				content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, rowtime, cnt, name, goods])
+				content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)])
-					ship_strategy : FORWARD
+					content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+					ship_strategy : HASH
 
 					 : Operator
-						content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
-						ship_strategy : HASH
+						content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+						ship_strategy : FORWARD
 
 						 : Operator
-							content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
-							ship_strategy : FORWARD
+							content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+							ship_strategy : HASH
 
 							 : Operator
-								content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-								ship_strategy : HASH
+								content : SinkConversionToRow
+								ship_strategy : FORWARD
 
 								 : Operator
-									content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+									content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
 									ship_strategy : FORWARD
 
 									 : Operator
-										content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+										content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 										ship_strategy : HASH
 
 										 : Operator
@@ -363,49 +447,37 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
-												ship_strategy : FORWARD
+												content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text])
+												ship_strategy : HASH
 
 												 : Operator
-													content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
-													ship_strategy : HASH
+													content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+													ship_strategy : FORWARD
 
 													 : Operator
-														content : SinkConversionToRow
-														ship_strategy : FORWARD
+														content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
+														ship_strategy : HASH
 
 														 : Operator
-															content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text])
-															ship_strategy : HASH
+															content : SinkConversionToTuple2
+															ship_strategy : FORWARD
 
 															 : Operator
-																content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+																content : Map
 																ship_strategy : FORWARD
 
-																 : Operator
-																	content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
-																	ship_strategy : HASH
+																 : Data Sink
+																	content : Sink: TestingAppendTableSink
+																	ship_strategy : FORWARD
 
-																	 : Operator
-																		content : SinkConversionToTuple2
+																	 : Data Sink
+																		content : Sink: TestingAppendTableSink
 																		ship_strategy : FORWARD
 
-																		 : Operator
-																			content : Map
+																		 : Data Sink
+																			content : Sink: TestingRetractTableSink
 																			ship_strategy : FORWARD
 
-																			 : Data Sink
-																				content : Sink: TestingAppendTableSink
-																				ship_strategy : FORWARD
-
-																				 : Data Sink
-																					content : Sink: TestingAppendTableSink
-																					ship_strategy : FORWARD
-
-																					 : Data Sink
-																						content : Sink: TestingRetractTableSink
-																						ship_strategy : FORWARD
-
 ]]>
     </Resource>
   </TestCase>
@@ -442,30 +514,6 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRedundantWatermarkDefinition">
-    <Resource name="sql">
-      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
-+- LogicalProject(b=[$1], a=[$0], c=[$2])
-   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
-+- Exchange(distribution=[hash[b]])
-   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
-      +- Calc(select=[b, a, c])
-         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-               +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testRowtimeRowsOverWithMiniBatch">
     <Resource name="sql">
       <![CDATA[
@@ -473,7 +521,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
  FROM (
    SELECT c, COUNT(a)
    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
-   FROM MyTable3
+   FROM wmTable1
  )
  GROUP BY cnt
       ]]>
@@ -482,8 +530,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1
       <![CDATA[
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
 +- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2])
-   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+         +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -496,8 +545,35 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1])
             +- Exchange(distribution=[hash[c]])
                +- Calc(select=[a, c, rowtime])
                   +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                        +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                           +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRedundantWatermarkDefinition">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+         +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- Calc(select=[b, a, c])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+               +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                  +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -521,7 +597,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalFilter(condition=[=($1, $6)])
       +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
          :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+         :  +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]])
          +- LogicalTableFunctionScan(invocation=[Rates($cor0.rowtime)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(PROCTIME) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
@@ -536,58 +612,12 @@ GlobalGroupAggregate(groupBy=[r_a], select=[r_a, COUNT(count$0) AS EXPR$1])
             :  +- Calc(select=[a, b, rowtime])
             :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
             :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :           +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :           +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime])
             +- Exchange(distribution=[hash[b]])
                +- Calc(select=[a, b, rowtime])
                   +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
                      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testIntervalJoinWithMiniBatch">
-    <Resource name="sql">
-      <![CDATA[
- SELECT b, COUNT(a)
- FROM (
-   SELECT t1.a as a, t1.b as b
-   FROM
-     LeftT as t1 JOIN RightT as t2
-   ON
-     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
-     t2.rowtime + INTERVAL '10' SECOND
- )
- GROUP BY b
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
-+- LogicalProject(b=[$1], a=[$0])
-   +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
-      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
-      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
-+- Exchange(distribution=[hash[b]])
-   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
-      +- Calc(select=[b, a])
-         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0])
-            :- Exchange(distribution=[hash[a]])
-            :  +- Calc(select=[a, b, rowtime])
-            :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-            :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :           +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
-            +- Exchange(distribution=[hash[a]])
-               +- Calc(select=[a, rowtime])
-                  +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-                     +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                        +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream2]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -600,7 +630,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
    SELECT b,
      COUNT(a) as cnt,
      TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
-   FROM MyTable3
+   FROM wmTable1
    GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
  )
  GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -613,8 +643,9 @@ LogicalProject(b=[$0], EXPR$1=[$2])
    +- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2])
       +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
          +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0])
-            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -625,8 +656,9 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)],
       +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
          +- Exchange(distribution=[hash[b]])
             +- Calc(select=[b, rowtime, a])
-               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                  +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                  +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -639,7 +671,7 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)],
      COUNT(a) as cnt,
      HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
      HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
-   FROM MyTable3
+   FROM wmTable1
    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
  )
  GROUP BY b
@@ -651,8 +683,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
 +- LogicalProject(b=[$0], cnt=[$2])
    +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
       +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3])
+               +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -663,8 +696,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(sum$0) AS EXPR$1])
       +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 millisecond])
          +- Exchange(distribution=[hash[b]])
             +- Calc(select=[b, rowtime, a])
-               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                  +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                  +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index 26fabc1..fe3d08a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -39,9 +39,37 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Before
   def setup(): Unit = {
     util.addDataStream[(Int, String, Long)](
-      "MyTable1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+      "MyDataStream1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
     util.addDataStream[(Int, String, Long)](
-      "MyTable2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+      "MyDataStream2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+    // register tables using DDL
+    util.addTable(
+      s"""
+         |CREATE TABLE MyTable1 (
+         |  `a` INT,
+         |  `b` STRING,
+         |  `c` BIGINT,
+         |  proctime AS PROCTIME(),
+         |  rowtime TIMESTAMP(3)
+         |) WITH (
+         |  'connector' = 'values'
+         |)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE wmTable1 (
+         |  WATERMARK FOR rowtime AS rowtime
+         |) LIKE MyTable1 (INCLUDING ALL)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE wmTable2 (
+         |  WATERMARK FOR rowtime AS rowtime
+         |) LIKE MyTable1 (INCLUDING ALL)
+         |""".stripMargin)
+
+    // enable mini-batch
     util.tableEnv.getConfig.getConfiguration.setBoolean(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
   }
@@ -49,17 +77,24 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMiniBatchOnly(): Unit = {
     util.tableEnv.getConfig.getConfiguration
-        .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
+      .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
     val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b"
     util.verifyPlan(sql)
   }
 
   @Test
+  def testMiniBatchOnlyForDataStream(): Unit = {
+    util.tableEnv.getConfig.getConfiguration
+        .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b"
+    util.verifyPlan(sql)
+  }
+
+  @Test
   def testRedundantWatermarkDefinition(): Unit = {
     util.tableEnv.getConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
-    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b"
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b"
     util.verifyPlan(sql)
   }
 
@@ -69,7 +104,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     tableConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
     withEarlyFireDelay(tableConfig, Time.milliseconds(500))
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
     val sql =
       """
         | SELECT b, SUM(cnt)
@@ -78,7 +112,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |     COUNT(a) as cnt,
         |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
         |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
-        |   FROM MyTable3
+        |   FROM wmTable1
         |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
         | )
         | GROUP BY b
@@ -90,7 +124,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   def testWindowCascade(): Unit = {
     util.tableEnv.getConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "3 s")
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
     val sql =
       """
         | SELECT b,
@@ -99,7 +132,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |   SELECT b,
         |     COUNT(a) as cnt,
         |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
-        |   FROM MyTable3
+        |   FROM wmTable1
         |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
         | )
         | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -109,8 +142,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testIntervalJoinWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -120,7 +151,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT t1.a as a, t1.b as b
         |   FROM
-        |     LeftT as t1 JOIN RightT as t2
+        |     wmTable1 as t1 JOIN wmTable2 as t2
         |   ON
         |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
         |     t2.rowtime + INTERVAL '10' SECOND
@@ -132,7 +163,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testRowtimeRowsOverWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -142,7 +172,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT c, COUNT(a)
         |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt
-        |   FROM MyTable3
+        |   FROM wmTable1
         | )
         | GROUP BY cnt
       """.stripMargin
@@ -152,8 +182,8 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testTemporalTableFunctionJoinWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("Orders", util.tableEnv.from("MyTable1"), "rowtime", 0)
-    util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyTable2"), "rowtime", 0)
+    util.addTableWithWatermark("Orders", util.tableEnv.from("MyDataStream1"), "rowtime", 0)
+    util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyDataStream2"), "rowtime", 0)
 
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -180,8 +210,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMultiOperatorNeedsWatermark1(): Unit = {
     // infer result: miniBatchInterval=[Rowtime, 0ms]
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -194,7 +222,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt
         |   FROM
-        |     LeftT as t1 JOIN RightT as t2
+        |     wmTable1 as t1 JOIN wmTable2 as t2
         |   ON
         |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
         |     t2.rowtime + INTERVAL '10' SECOND
@@ -206,8 +234,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark2(): Unit = {
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
 
@@ -222,7 +248,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    SELECT b,
         |     COUNT(a) as a,
         |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
-        |    FROM LeftT
+        |    FROM wmTable1
         |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
         |  ) as t1
         |  JOIN
@@ -230,7 +256,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    SELECT b,
         |     COUNT(a) as a,
         |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-        |    FROM RightT
+        |    FROM wmTable2
         |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
         |  ) as t2
         |  ON
@@ -243,7 +269,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark3(): Unit = {
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
 
@@ -258,7 +283,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    FROM (
         |      SELECT b, COUNT(a) as a,
         |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-        |      FROM RightT
+        |      FROM wmTable1
         |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
         |    )
         |    GROUP BY b
@@ -274,12 +299,30 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMultipleWindowAggregates(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime.rowtime, 'text)
-    util.addDataStream[(Int, Long, Int, String, String)](
-      "T2",
-      'id2, 'rowtime.rowtime, 'cnt, 'name, 'goods)
-    util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0)
-    util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0)
+    util.addTable(
+      s"""
+         |CREATE TABLE T1 (
+         | id1 INT,
+         | rowtime TIMESTAMP(3),
+         | `text` STRING,
+         | WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         | 'connector' = 'values'
+         |)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE T2 (
+         | id2 INT,
+         | rowtime TIMESTAMP(3),
+         | cnt INT,
+         | name STRING,
+         | goods STRING,
+         | WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         | 'connector' = 'values'
+         |)
+         |""".stripMargin)
 
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "500 ms")
@@ -288,13 +331,13 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, T3.rowtime AS ts, text
-        |  FROM T3, T4
+        |SELECT id1, T1.rowtime AS ts, text
+        |  FROM T1, T2
         |WHERE id1 = id2
-        |      AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
-        |      AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+        |      AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE
+        |      AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE
       """.stripMargin)
-    util.tableEnv.registerTable("TempTable1", table1)
+    util.tableEnv.createTemporaryView("TempTable1", table1)
 
     val table2 = util.tableEnv.sqlQuery(
       """
@@ -304,7 +347,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
       """.stripMargin)
-    util.tableEnv.registerTable("TempTable2", table2)
+    util.tableEnv.createTemporaryView("TempTable2", table2)
 
   val table3 = util.tableEnv.sqlQuery(
       """