You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/07 08:32:51 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #13449: [FLINK-19282][table sql/planner]Supports watermark push down with Wat…

godfreyhe commented on a change in pull request #13449:
URL: https://github.com/apache/flink/pull/13449#discussion_r497224466



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
##########
@@ -249,13 +249,15 @@ class ExpressionReducer(
 /**
   * Constant expression code generator context.
   */
-class ConstantCodeGeneratorContext(tableConfig: TableConfig)
+class ConstantCodeGeneratorContext(
+  tableConfig: TableConfig,
+  contextTerm: String = "parameters")

Review comment:
       indent

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -18,14 +18,16 @@
 
 package org.apache.flink.table.planner.codegen
 
+import org.apache.calcite.rex.RexNode

Review comment:
       nit: reorder imports

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -44,11 +47,33 @@ object WatermarkGeneratorCodeGenerator {
           " but is " + watermarkOutputType)
     }
     val funcName = newName("WatermarkGenerator")
-    val ctx = CodeGeneratorContext(config)
+    val ctx = if (contextTerm != null) {
+      new ConstantCodeGeneratorContext(config, contextTerm)

Review comment:
       why we use `ConstantCodeGeneratorContext` here ? `ConstantCodeGeneratorContext` is used for constant reducer, we should create a new special CodeGeneratorContext for watermark generator

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.

Review comment:
       please add some comments to explain the purpose of this rule

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+	public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new PushWatermarkIntoTableSourceScanRule();
+
+	public PushWatermarkIntoTableSourceScanRule() {
+		super(operand(LogicalWatermarkAssigner.class,
+				operand(LogicalTableScan.class, none())),
+				"PushWatermarkIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		FlinkContext context = (FlinkContext) call.getPlanner().getContext();
+		TableConfig config = context.getTableConfig();
+
+		// generate an inner watermark generator class that allows us to pass FunctionContext and ClassLoader
+		GeneratedWatermarkGenerator generatedWatermarkGenerator =
+				WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+						config,
+						FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+						watermarkAssigner.watermarkExpr(),
+						"context");
+		Configuration configuration = context.getTableConfig().getConfiguration();
+
+		WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+		String digest = String.format("watermark=[%s]", watermarkAssigner.watermarkExpr());
+
+		WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier);
+		Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+		if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+			watermarkStrategy.withIdleness(idleTimeout);
+			digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis());
+		}
+
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();
+
+		((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+		TableSourceTable newTableSourceTable = tableSourceTable.copy(
+				newDynamicTableSource,
+				watermarkAssigner.getRowType(),
+				new String[]{digest});
+		LogicalTableScan newScan = new LogicalTableScan(
+				scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);
+
+		call.transformTo(newScan);
+	}
+
+	private static class DefaultWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> {

Review comment:
       add `serialVersionUID` field

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+	public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new PushWatermarkIntoTableSourceScanRule();
+
+	public PushWatermarkIntoTableSourceScanRule() {
+		super(operand(LogicalWatermarkAssigner.class,
+				operand(LogicalTableScan.class, none())),
+				"PushWatermarkIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		FlinkContext context = (FlinkContext) call.getPlanner().getContext();
+		TableConfig config = context.getTableConfig();
+
+		// generate an inner watermark generator class that allows us to pass FunctionContext and ClassLoader
+		GeneratedWatermarkGenerator generatedWatermarkGenerator =
+				WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+						config,
+						FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+						watermarkAssigner.watermarkExpr(),
+						"context");
+		Configuration configuration = context.getTableConfig().getConfiguration();
+
+		WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+		String digest = String.format("watermark=[%s]", watermarkAssigner.watermarkExpr());
+
+		WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier);
+		Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+		if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+			watermarkStrategy.withIdleness(idleTimeout);
+			digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis());
+		}
+
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();
+
+		((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+		TableSourceTable newTableSourceTable = tableSourceTable.copy(
+				newDynamicTableSource,
+				watermarkAssigner.getRowType(),
+				new String[]{digest});
+		LogicalTableScan newScan = new LogicalTableScan(
+				scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);
+
+		call.transformTo(newScan);
+	}
+
+	private static class DefaultWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> {
+		private final Configuration configuration;
+		private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
+
+		public DefaultWatermarkGeneratorSupplier(Configuration configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) {
+			this.configuration = configuration;
+			this.generatedWatermarkGenerator = generatedWatermarkGenerator;
+		}
+
+		@Override
+		public WatermarkGenerator<RowData> createWatermarkGenerator(Context context) {
+
+			List<Object> references = new ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
+			references.add(context);
+
+			org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator =
+					new GeneratedWatermarkGenerator(
+						generatedWatermarkGenerator.getClassName(),
+						generatedWatermarkGenerator.getCode(),
+						references.toArray())
+							.newInstance(Thread.currentThread().getContextClassLoader());
+
+			try {
+				innerWatermarkGenerator.open(configuration);
+			} catch (Exception e) {
+				throw new RuntimeException("Fail to instantiate generated watermark generator.", e);
+			}
+			return new DefaultWatermarkGenerator(innerWatermarkGenerator);
+		}
+
+		private class DefaultWatermarkGenerator implements WatermarkGenerator<RowData> {

Review comment:
       add `static` identifier and add `serialVersionUID` field

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+	public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new PushWatermarkIntoTableSourceScanRule();
+
+	public PushWatermarkIntoTableSourceScanRule() {
+		super(operand(LogicalWatermarkAssigner.class,
+				operand(LogicalTableScan.class, none())),
+				"PushWatermarkIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		FlinkContext context = (FlinkContext) call.getPlanner().getContext();
+		TableConfig config = context.getTableConfig();
+
+		// generate an inner watermark generator class that allows us to pass FunctionContext and ClassLoader
+		GeneratedWatermarkGenerator generatedWatermarkGenerator =
+				WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+						config,
+						FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+						watermarkAssigner.watermarkExpr(),
+						"context");
+		Configuration configuration = context.getTableConfig().getConfiguration();
+
+		WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+		String digest = String.format("watermark=[%s]", watermarkAssigner.watermarkExpr());
+
+		WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier);
+		Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+		if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+			watermarkStrategy.withIdleness(idleTimeout);
+			digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis());
+		}
+
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();
+
+		((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+		TableSourceTable newTableSourceTable = tableSourceTable.copy(
+				newDynamicTableSource,
+				watermarkAssigner.getRowType(),
+				new String[]{digest});
+		LogicalTableScan newScan = new LogicalTableScan(

Review comment:
       nit: use `LogicalTableScan.create` instead

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.
+ * */
+public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
+	private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Before
+	public void setup() {
+		util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+		calciteConfig.getStreamProgram().get().addLast(
+				"PushWatermarkIntoTableSourceScanRule",
+				FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+						.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+						.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+						.add(RuleSets.ofList(
+								WatermarkAssignerProjectTransposeRule.INSTANCE,
+								PushWatermarkIntoTableSourceScanRule.INSTANCE))
+						.build()
+		);
+	}
+
+	@Test
+	public void testSimpleWatermark() {
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c timestamp(3),\n" +
+				"  watermark for c as c - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testSimpleTranspose() {
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c timestamp(3),\n" +
+				"  d as c + interval '5' second,\n" +
+				"  watermark for d as d - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testSimpleTransposeNotNull() {
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c timestamp(3) not null,\n" +
+				"  d as c + interval '5' second,\n" +
+				"  watermark for d as d - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testComputedColumnWithMultipleInputs() {
+		String ddl = "create table MyTable(" +
+				"  a string,\n" +
+				"  b string,\n" +
+				"  c as to_timestamp(a, b),\n" +
+				"  watermark for c as c - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testTransposeWithRow() {
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c row<name string, d timestamp(3)>," +
+				"  e as c.d," +
+				"  watermark for e as e - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testTransposeWithNestedRow() {
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c row<name string, d row<e string, f timestamp(3)>>," +
+				"  g as c.d.f," +
+				"  watermark for g as g - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");
+	}
+
+	@Test
+	public void testTransposeWithUdf() {
+		util.addFunction("func1", new InnerUdf());
+		String ddl = "create table MyTable(" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c timestamp(3)," +
+				"  d as func1(c)," +
+				"  watermark for d as d - interval '5' second\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'table-source-class' = 'WATERMARK_PUSH_DOWN',\n" +
+				" 'bounded' = 'false'\n" +
+				")";
+		util.tableEnv().executeSql(ddl);
+		util.verifyPlan("select * from MyTable");

Review comment:
       add some tests about 
   1. projection/filter in `select`, 
   2. watermark expression contains two field references

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
+	public static final WatermarkAssignerProjectTransposeRule INSTANCE = new WatermarkAssignerProjectTransposeRule();
+
+	public WatermarkAssignerProjectTransposeRule() {
+		super(operand(LogicalWatermarkAssigner.class,
+				operand(LogicalProject.class,
+						operand(LogicalTableScan.class, none()))),
+				"WatermarkAssignerProjectTransposeRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		LogicalTableScan scan = call.rel(2);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+		LogicalProject project = call.rel(1);
+
+		RexNode computedColumn = project.getProjects().get(watermarkAssigner.rowtimeFieldIndex());
+
+		RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+			@Override
+			public RexNode visitInputRef(RexInputRef inputRef) {
+				return computedColumn;
+			}
+		});
+
+		// use -1 to indicate rowtime column is not in scan and watermark generator has to calculate it.
+		LogicalWatermarkAssigner newWatermarkAssigner =
+				(LogicalWatermarkAssigner) watermarkAssigner.copy(watermarkAssigner.getTraitSet(),
+				project.getInput(),
+				-1,

Review comment:
       could this rule push watermark into scan directly? then we need not to change the semantic of  `LogicalWatermarkAssigner#rowtimeFieldIndex`. It's a little strange `rowtimeFieldIndex` is -1. ( we can extract a base class for `PushWatermarkIntoTableSourceScanRule` and `WatermarkAssignerProjectTransposeRule`)

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableSourceBase.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RuntimeConverter;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class.
+ * */
+public abstract class TestValuesTableSourceBase implements ScanTableSource, LookupTableSource {
+
+	protected TableSchema physicalSchema;
+	protected final ChangelogMode changelogMode;
+	protected final boolean bounded;
+	protected final String runtimeSource;
+	/* If source table is not partitionable, we will put all data into a emptyMap. */
+	protected Map<Map<String, String>, Collection<Row>> data;
+	protected final boolean isAsync;
+	protected final @Nullable String lookupFunctionClass;
+
+	protected TestValuesTableSourceBase(
+			TableSchema physicalSchema,
+			ChangelogMode changelogMode,
+			boolean bounded,
+			String runtimeSource,
+			Map<Map<String, String>, Collection<Row>> data,
+			boolean isAsync,
+			@Nullable String lookupFunctionClass) {
+		this.physicalSchema = physicalSchema;
+		this.changelogMode = changelogMode;
+		this.bounded = bounded;
+		this.runtimeSource = runtimeSource;
+		this.data = data;
+		this.isAsync = isAsync;
+		this.lookupFunctionClass = lookupFunctionClass;
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return changelogMode;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+		TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
+				.createTypeInformation(physicalSchema.toRowDataType())
+				.createSerializer(new ExecutionConfig());
+		DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
+		converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
+		Collection<RowData> values = convertToRowData(converter);
+
+		if (runtimeSource.equals("SourceFunction")) {
+			try {
+				return SourceFunctionProvider.of(
+						new FromElementsFunction<>(serializer, values),
+						bounded);
+			} catch (IOException e) {
+				throw new TableException("Fail to init source function", e);
+			}
+		} else if (runtimeSource.equals("InputFormat")) {
+			return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
+		} else {
+			throw new IllegalArgumentException("Unsupported runtime source class: " + runtimeSource);
+		}
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	@Override
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+		if (lookupFunctionClass != null) {
+			// use the specified lookup function
+			try {
+				Class<?> clazz = Class.forName(lookupFunctionClass);
+				Object udtf = InstantiationUtil.instantiate(clazz);
+				if (udtf instanceof TableFunction) {
+					return TableFunctionProvider.of((TableFunction) udtf);
+				} else {
+					return AsyncTableFunctionProvider.of((AsyncTableFunction) udtf);
+				}
+			} catch (ClassNotFoundException e) {
+				throw new IllegalArgumentException("Could not instantiate class: " + lookupFunctionClass);
+			}
+		}
+
+		int[] lookupIndices = Arrays.stream(context.getKeys())
+				.mapToInt(k -> k[0])
+				.toArray();
+		Map<Row, List<Row>> mapping = new HashMap<>();
+
+		data.get(Collections.emptyMap()).forEach(record -> {
+			Row key = Row.of(Arrays.stream(lookupIndices)
+					.mapToObj(record::getField)
+					.toArray());
+			List<Row> list = mapping.get(key);
+			if (list != null) {
+				list.add(record);
+			} else {
+				list = new ArrayList<>();
+				list.add(record);
+				mapping.put(key, list);
+			}
+		});
+		if (isAsync) {
+			return AsyncTableFunctionProvider.of(new TestValuesRuntimeFunctions.AsyncTestValueLookupFunction(mapping));
+		} else {
+			return TableFunctionProvider.of(new TestValuesRuntimeFunctions.TestValuesLookupFunction(mapping));
+		}
+	}
+
+	protected Collection<RowData> convertToRowData(DataStructureConverter converter) {
+		List<RowData> result = new ArrayList<>();
+		for (Row value : handle()) {
+			RowData rowData = (RowData) converter.toInternal(value);
+			if (rowData != null) {
+				rowData.setRowKind(value.getKind());
+				result.add(rowData);
+			}
+		}
+		return result;
+	}
+
+	/*
+	* Used by apply method to deal with.
+	* */

Review comment:
       ```
   /**
    * 
    */
   ```

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql
+
+import java.sql.Timestamp
+import java.time.LocalDateTime
+
+import org.apache.flink.api.common.eventtime.Watermark
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.data.TimestampData
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class SourceWatermarkITCase extends StreamingTestBase{
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(1)

Review comment:
       why parallelism is 1 ?

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -32,21 +32,27 @@ import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctio
 import org.apache.flink.table.runtime.generated.WatermarkGenerator
 import org.apache.flink.table.types.logical.{IntType, TimestampType}
 import org.apache.flink.table.utils.CatalogManagerMocks
-
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.rel.`type`.RelDataType
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
-
 import java.lang.{Integer => JInt, Long => JLong}
+import java.util
 import java.util.Collections
 import java.util.function.{Function => JFunction, Supplier => JSupplier}
 
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier

Review comment:
       reorder imports

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -35,7 +37,8 @@ object WatermarkGeneratorCodeGenerator {
   def generateWatermarkGenerator(
       config: TableConfig,
       inputType: RowType,
-      watermarkExpr: RexNode): GeneratedWatermarkGenerator = {
+      watermarkExpr: RexNode,
+      contextTerm: String = null): GeneratedWatermarkGenerator = {

Review comment:
       please use `None` instead of `null` in Scala

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/abilities/PeriodicWatermarkAssignerProvider.java
##########
@@ -28,6 +28,7 @@
  * generating watermarks in {@link ScanTableSource}.
  */
 @PublicEvolving
+@Deprecated

Review comment:
       it's better to add some comments about "@Deprecated" action

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -154,7 +199,9 @@ class WatermarkGeneratorCodeGenTest {
     assertTrue(JavaFunc5.closeCalled)
   }
 
-  private def generateWatermarkGenerator(expr: String): WatermarkGenerator = {
+
+  private def generateWatermarkGenerator(expr: String,
+      useDefinedConstructor: Boolean = true): WatermarkGenerator = {

Review comment:
       nit: remove the default value

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
+	public static final WatermarkAssignerProjectTransposeRule INSTANCE = new WatermarkAssignerProjectTransposeRule();
+
+	public WatermarkAssignerProjectTransposeRule() {
+		super(operand(LogicalWatermarkAssigner.class,
+				operand(LogicalProject.class,
+						operand(LogicalTableScan.class, none()))),
+				"WatermarkAssignerProjectTransposeRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		LogicalTableScan scan = call.rel(2);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+		LogicalProject project = call.rel(1);
+
+		RexNode computedColumn = project.getProjects().get(watermarkAssigner.rowtimeFieldIndex());
+
+		RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+			@Override
+			public RexNode visitInputRef(RexInputRef inputRef) {
+				return computedColumn;

Review comment:
       do all input refs in watermark expression are from rowtime field ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {

Review comment:
       please add some comments to explain the purpose of this rule

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableSourceBase.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RuntimeConverter;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class.

Review comment:
       give some more meaningful comments

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.
+ * */
+public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
+	private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Before
+	public void setup() {
+		util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+		calciteConfig.getStreamProgram().get().addLast(
+				"PushWatermarkIntoTableSourceScanRule",
+				FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+						.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+						.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+						.add(RuleSets.ofList(
+								WatermarkAssignerProjectTransposeRule.INSTANCE,
+								PushWatermarkIntoTableSourceScanRule.INSTANCE))
+						.build()
+		);

Review comment:
       it's better to build a specific program that only contains the rules needed for the current test, this could avoid  interference with other rules.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.

Review comment:
       Test for [[PushWatermarkIntoTableSourceScanRule]] and [[WatermarkAssignerProjectTransposeRule]]

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql
+
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest
+
+import org.junit.Test
+
+class SourceWatermarkTest extends TableTestBase {

Review comment:
       extract a base test class for `PushWatermarkIntoTableSourceScanRuleTest` and `SourceWatermarkTest ` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org