You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 02:10:19 UTC

[flink] branch release-1.15 updated: [FLINK-29138][table-planner] fix project can not be pushed into lookup source

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e7c7df4c9b0 [FLINK-29138][table-planner] fix project can not be pushed into lookup source
e7c7df4c9b0 is described below

commit e7c7df4c9b07667344e33c23bb92cb8a07e3ac0b
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon Sep 5 10:10:10 2022 +0800

    [FLINK-29138][table-planner] fix project can not be pushed into lookup source
    
    This closes #20727
---
 .../logical/ProjectSnapshotTransposeRule.java      |  77 ++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   3 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   3 +
 .../planner/factories/TestValuesTableFactory.java  |  20 +-
 .../logical/ProjectSnapshotTransposeRuleTest.java  | 136 +++++++++++
 .../planner/plan/batch/sql/join/LookupJoinTest.xml |   2 +-
 ...testJoinTemporalTableWithProjectionPushDown.out |  19 +-
 .../logical/ProjectSnapshotTransposeRuleTest.xml   | 257 +++++++++++++++++++++
 .../plan/stream/sql/join/TemporalJoinTest.xml      |   6 +-
 9 files changed, 506 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java
new file mode 100644
index 00000000000..27c9dc7e45c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+
+/** Transpose {@link LogicalProject} past into {@link LogicalSnapshot}. */
+public class ProjectSnapshotTransposeRule extends RelRule<ProjectSnapshotTransposeRule.Config> {
+
+    public static final RelOptRule INSTANCE =
+            ProjectSnapshotTransposeRule.Config.EMPTY.as(Config.class).withOperator().toRule();
+
+    public ProjectSnapshotTransposeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        LogicalProject project = call.rel(0);
+        // Don't push a project which contains over into a snapshot, snapshot on window aggregate is
+        // unsupported for now.
+        return !project.containsOver();
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalProject project = call.rel(0);
+        LogicalSnapshot snapshot = call.rel(1);
+        RelNode newProject = project.copy(project.getTraitSet(), snapshot.getInputs());
+        RelNode newSnapshot =
+                snapshot.copy(snapshot.getTraitSet(), newProject, snapshot.getPeriod());
+        call.transformTo(newSnapshot);
+    }
+
+    /** Configuration for {@link ProjectSnapshotTransposeRule}. */
+    public interface Config extends RelRule.Config {
+
+        @Override
+        default RelOptRule toRule() {
+            return new ProjectSnapshotTransposeRule(this);
+        }
+
+        default ProjectSnapshotTransposeRule.Config withOperator() {
+            final RelRule.OperandTransform snapshotTransform =
+                    operandBuilder -> operandBuilder.operand(LogicalSnapshot.class).noInputs();
+
+            final RelRule.OperandTransform projectTransform =
+                    operandBuilder ->
+                            operandBuilder
+                                    .operand(LogicalProject.class)
+                                    .oneInput(snapshotTransform);
+
+            return withOperandSupplier(projectTransform).as(Config.class);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 22b30f63e4c..6968d3527de 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -235,7 +235,8 @@ object FlinkBatchRuleSets {
     PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
     PushFilterIntoTableSourceScanRule.INSTANCE,
     PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
-
+    // transpose project and snapshot for scan optimization
+    ProjectSnapshotTransposeRule.INSTANCE,
     // reorder sort and projection
     CoreRules.SORT_PROJECT_TRANSPOSE,
     // remove unnecessary sort rule
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d7ba45ebf71..f0b37821ca5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -234,6 +234,9 @@ object FlinkStreamRuleSets {
     PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
     PushLimitIntoTableSourceScanRule.INSTANCE,
 
+    // transpose project and snapshot for scan optimization
+    ProjectSnapshotTransposeRule.INSTANCE,
+
     // reorder the project and watermark assigner
     ProjectWatermarkAssignerTransposeRule.INSTANCE,
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 1c93fe10e02..01759b22c1d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -472,6 +472,7 @@ public final class TestValuesTableFactory
                 }
             } else {
                 return new TestValuesScanLookupTableSource(
+                        context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
                         producedDataType,
                         changelogMode,
                         isBounded,
@@ -1047,7 +1048,7 @@ public final class TestValuesTableFactory
             return result;
         }
 
-        private Row projectRow(Row row) {
+        protected Row projectRow(Row row) {
             if (projectedPhysicalFields == null) {
                 return row;
             }
@@ -1370,7 +1371,10 @@ public final class TestValuesTableFactory
         private final @Nullable String lookupFunctionClass;
         private final boolean isAsync;
 
+        private final DataType originType;
+
         private TestValuesScanLookupTableSource(
+                DataType originType,
                 DataType producedDataType,
                 ChangelogMode changelogMode,
                 boolean bounded,
@@ -1404,6 +1408,7 @@ public final class TestValuesTableFactory
                     allPartitions,
                     readableMetadata,
                     projectedMetadataFields);
+            this.originType = originType;
             this.lookupFunctionClass = lookupFunctionClass;
             this.isAsync = isAsync;
         }
@@ -1446,20 +1451,24 @@ public final class TestValuesTableFactory
                     data = data.subList(numElementToSkip, data.size());
                 }
             }
-
+            if (nestedProjectionSupported) {
+                throw new UnsupportedOperationException(
+                        "nestedProjectionSupported is unsupported for lookup source currently.");
+            }
             data.forEach(
                     record -> {
+                        Row projected = projectRow(record);
                         Row key =
                                 Row.of(
                                         Arrays.stream(lookupIndices)
-                                                .mapToObj(record::getField)
+                                                .mapToObj(projected::getField)
                                                 .toArray());
                         List<Row> list = mapping.get(key);
                         if (list != null) {
-                            list.add(record);
+                            list.add(projected);
                         } else {
                             list = new ArrayList<>();
-                            list.add(record);
+                            list.add(projected);
                             mapping.put(key, list);
                         }
                     });
@@ -1473,6 +1482,7 @@ public final class TestValuesTableFactory
         @Override
         public DynamicTableSource copy() {
             return new TestValuesScanLookupTableSource(
+                    originType,
                     producedDataType,
                     changelogMode,
                     bounded,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java
new file mode 100644
index 00000000000..7850bda83bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Test rule {@link ProjectSnapshotTransposeRule}. */
+@RunWith(Parameterized.class)
+public class ProjectSnapshotTransposeRuleTest extends TableTestBase {
+
+    private static final String STREAM = "stream";
+    private static final String BATCH = "batch";
+
+    @Parameterized.Parameter public String mode;
+
+    @Parameterized.Parameters(name = "mode = {0}")
+    public static Collection<String> parameters() {
+        return Arrays.asList(STREAM, BATCH);
+    }
+
+    private TableTestUtil util;
+
+    @Before
+    public void setup() {
+        boolean isStreaming = STREAM.equals(mode);
+        if (isStreaming) {
+            util = streamTestUtil(TableConfig.getDefault());
+            ((StreamTableTestUtil) util).buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());
+        } else {
+            util = batchTestUtil(TableConfig.getDefault());
+            ((BatchTableTestUtil) util).buildBatchProgram(FlinkBatchProgram.LOGICAL_REWRITE());
+        }
+
+        TableEnvironment tEnv = util.getTableEnv();
+        String src =
+                String.format(
+                        "CREATE TABLE MyTable (\n"
+                                + "  a int,\n"
+                                + "  b varchar,\n"
+                                + "  c bigint,\n"
+                                + "  proctime as PROCTIME(),\n"
+                                + "  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n"
+                                + "  watermark for rowtime as rowtime - INTERVAL '1' second \n"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '%s')",
+                        !isStreaming);
+        String lookup =
+                String.format(
+                        "CREATE TABLE LookupTable (\n"
+                                + "  id int,\n"
+                                + "  name varchar,\n"
+                                + "  age int \n"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '%s')",
+                        !isStreaming);
+        tEnv.executeSql(src);
+        tEnv.executeSql(lookup);
+    }
+
+    @Test
+    public void testJoinTemporalTableWithProjectionPushDown() {
+        String sql =
+                "SELECT T.*, D.id\n"
+                        + "FROM MyTable AS T\n"
+                        + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+                        + "ON T.a = D.id";
+
+        util.verifyRelPlan(sql);
+    }
+
+    @Test
+    public void testJoinTemporalTableNotProjectable() {
+        String sql =
+                "SELECT T.*, D.*\n"
+                        + "FROM MyTable AS T\n"
+                        + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+                        + "ON T.a = D.id";
+
+        util.verifyRelPlan(sql);
+    }
+
+    @Test
+    public void testJoinTemporalTableWithReorderedProject() {
+        String sql =
+                "SELECT T.*, D.age, D.name, D.id\n"
+                        + "FROM MyTable AS T\n"
+                        + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+                        + "ON T.a = D.id";
+
+        util.verifyRelPlan(sql);
+    }
+
+    @Test
+    public void testJoinTemporalTableWithProjectAndFilter() {
+        String sql =
+                "SELECT T.*, D.id\n"
+                        + "FROM MyTable AS T\n"
+                        + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+                        + "ON T.a = D.id WHERE D.age > 20";
+
+        util.verifyRelPlan(sql);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
index aa6765410e3..3235613b0cd 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
@@ -475,7 +475,7 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
       :     +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d])
       +- FlinkLogicalSnapshot(period=[$cor0.proctime])
          +- FlinkLogicalCalc(select=[id], where=[>(age, 10)])
-            +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index c3012c79471..cd66b5d1773 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -246,9 +246,18 @@
               "bounded" : "false"
             }
           }
-        }
+        },
+        "abilities" : [ {
+          "type" : "ProjectPushDown",
+          "projectedFields" : [ [ 0 ] ],
+          "producedType" : "ROW<`id` INT> NOT NULL"
+        }, {
+          "type" : "ReadingMetadata",
+          "metadataKeys" : [ ],
+          "producedType" : "ROW<`id` INT> NOT NULL"
+        } ]
       },
-      "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647), `age` INT> NOT NULL"
+      "outputType" : "ROW<`id` INT> NOT NULL"
     },
     "lookupKeys" : {
       "0" : {
@@ -256,11 +265,7 @@
         "index" : 0
       }
     },
-    "projectionOnTemporalTable" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT"
-    } ],
+    "projectionOnTemporalTable" : null,
     "filterOnTemporalTable" : null,
     "inputProperties" : [ {
       "requiredDistribution" : {
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml
new file mode 100644
index 00000000000..58d680fb561
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testJoinTemporalTableNotProjectable[mode = batch]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.*
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithReorderedProject[mode = stream]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.age, D.name, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], age=[$7], name=[$6], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, b, c, proctime, rowtime, age, name, id])
++- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+   :  +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+   :     +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
+   +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableNotProjectable[mode = stream]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.*
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+:  +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:     +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithProjectAndFilter[mode = batch]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id WHERE D.age > 20]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalFilter(condition=[>($7, 20)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[id], where=[>(age, 20)])
+   +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithProjectAndFilter[mode = stream]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id WHERE D.age > 20]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalFilter(condition=[>($7, 20)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+      :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+:  +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:     +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[id], where=[>(age, 20)])
+   +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithProjectionPushDown[mode = batch]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id], metadata=[]]], fields=[id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithProjectionPushDown[mode = stream]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+:  +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+:     +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id], metadata=[]]], fields=[id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithReorderedProject[mode = batch]">
+    <Resource name="sql">
+      <![CDATA[SELECT T.*, D.age, D.name, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], age=[$7], name=[$6], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, b, c, proctime, rowtime, age, name, id])
++- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
+   +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
index 62a3747210c..472a6399c60 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
@@ -216,7 +216,7 @@ Calc(select=[currency, currency0, rate1])
    +- Exchange(distribution=[hash[currency]])
       +- Calc(select=[currency, rate1, rowtime], where=[(rate < 100)])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
-            +- Calc(select=[currency, rate, (rate + 1) AS rate1, PROCTIME() AS proctime, rowtime])
+            +- Calc(select=[currency, rate, (rate + 1) AS rate1, rowtime])
                +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithComputedColumn]], fields=[currency, rate, rowtime])
 ]]>
     </Resource>
@@ -501,7 +501,7 @@ Calc(select=[currency, currency0, rate1])
    :           +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])
    +- Exchange(distribution=[hash[currency]])
       +- Calc(select=[currency, (rate + 1) AS rate1], where=[(rate < 100)])
-         +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithoutWatermark, filter=[]]], fields=[currency, rate, rowtime])
+         +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithoutWatermark, project=[currency, rate], metadata=[], filter=[]]], fields=[currency, rate])
 ]]>
     </Resource>
   </TestCase>
@@ -535,7 +535,7 @@ Calc(select=[currency, currency0, rate1])
    +- Exchange(distribution=[hash[currency]])
       +- Calc(select=[currency, rate1], where=[(rate < 100)])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
-            +- Calc(select=[currency, rate, (rate + 1) AS rate1, PROCTIME() AS proctime, rowtime])
+            +- Calc(select=[currency, rate, (rate + 1) AS rate1, rowtime])
                +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithComputedColumn]], fields=[currency, rate, rowtime])
 ]]>
     </Resource>