You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 02:10:19 UTC
[flink] branch release-1.15 updated: [FLINK-29138][table-planner] fix project can not be pushed into lookup source
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e7c7df4c9b0 [FLINK-29138][table-planner] fix project can not be pushed into lookup source
e7c7df4c9b0 is described below
commit e7c7df4c9b07667344e33c23bb92cb8a07e3ac0b
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon Sep 5 10:10:10 2022 +0800
[FLINK-29138][table-planner] fix project can not be pushed into lookup source
This closes #20727
---
.../logical/ProjectSnapshotTransposeRule.java | 77 ++++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 3 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 3 +
.../planner/factories/TestValuesTableFactory.java | 20 +-
.../logical/ProjectSnapshotTransposeRuleTest.java | 136 +++++++++++
.../planner/plan/batch/sql/join/LookupJoinTest.xml | 2 +-
...testJoinTemporalTableWithProjectionPushDown.out | 19 +-
.../logical/ProjectSnapshotTransposeRuleTest.xml | 257 +++++++++++++++++++++
.../plan/stream/sql/join/TemporalJoinTest.xml | 6 +-
9 files changed, 506 insertions(+), 17 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java
new file mode 100644
index 00000000000..27c9dc7e45c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+
+/** Transpose {@link LogicalProject} past into {@link LogicalSnapshot}. */
+public class ProjectSnapshotTransposeRule extends RelRule<ProjectSnapshotTransposeRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ ProjectSnapshotTransposeRule.Config.EMPTY.as(Config.class).withOperator().toRule();
+
+ public ProjectSnapshotTransposeRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ // Don't push a project which contains over into a snapshot, snapshot on window aggregate is
+ // unsupported for now.
+ return !project.containsOver();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalSnapshot snapshot = call.rel(1);
+ RelNode newProject = project.copy(project.getTraitSet(), snapshot.getInputs());
+ RelNode newSnapshot =
+ snapshot.copy(snapshot.getTraitSet(), newProject, snapshot.getPeriod());
+ call.transformTo(newSnapshot);
+ }
+
+ /** Configuration for {@link ProjectSnapshotTransposeRule}. */
+ public interface Config extends RelRule.Config {
+
+ @Override
+ default RelOptRule toRule() {
+ return new ProjectSnapshotTransposeRule(this);
+ }
+
+ default ProjectSnapshotTransposeRule.Config withOperator() {
+ final RelRule.OperandTransform snapshotTransform =
+ operandBuilder -> operandBuilder.operand(LogicalSnapshot.class).noInputs();
+
+ final RelRule.OperandTransform projectTransform =
+ operandBuilder ->
+ operandBuilder
+ .operand(LogicalProject.class)
+ .oneInput(snapshotTransform);
+
+ return withOperandSupplier(projectTransform).as(Config.class);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 22b30f63e4c..6968d3527de 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -235,7 +235,8 @@ object FlinkBatchRuleSets {
PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
-
+ // transpose project and snapshot for scan optimization
+ ProjectSnapshotTransposeRule.INSTANCE,
// reorder sort and projection
CoreRules.SORT_PROJECT_TRANSPOSE,
// remove unnecessary sort rule
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d7ba45ebf71..f0b37821ca5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -234,6 +234,9 @@ object FlinkStreamRuleSets {
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
PushLimitIntoTableSourceScanRule.INSTANCE,
+ // transpose project and snapshot for scan optimization
+ ProjectSnapshotTransposeRule.INSTANCE,
+
// reorder the project and watermark assigner
ProjectWatermarkAssignerTransposeRule.INSTANCE,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 1c93fe10e02..01759b22c1d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -472,6 +472,7 @@ public final class TestValuesTableFactory
}
} else {
return new TestValuesScanLookupTableSource(
+ context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
producedDataType,
changelogMode,
isBounded,
@@ -1047,7 +1048,7 @@ public final class TestValuesTableFactory
return result;
}
- private Row projectRow(Row row) {
+ protected Row projectRow(Row row) {
if (projectedPhysicalFields == null) {
return row;
}
@@ -1370,7 +1371,10 @@ public final class TestValuesTableFactory
private final @Nullable String lookupFunctionClass;
private final boolean isAsync;
+ private final DataType originType;
+
private TestValuesScanLookupTableSource(
+ DataType originType,
DataType producedDataType,
ChangelogMode changelogMode,
boolean bounded,
@@ -1404,6 +1408,7 @@ public final class TestValuesTableFactory
allPartitions,
readableMetadata,
projectedMetadataFields);
+ this.originType = originType;
this.lookupFunctionClass = lookupFunctionClass;
this.isAsync = isAsync;
}
@@ -1446,20 +1451,24 @@ public final class TestValuesTableFactory
data = data.subList(numElementToSkip, data.size());
}
}
-
+ if (nestedProjectionSupported) {
+ throw new UnsupportedOperationException(
+ "nestedProjectionSupported is unsupported for lookup source currently.");
+ }
data.forEach(
record -> {
+ Row projected = projectRow(record);
Row key =
Row.of(
Arrays.stream(lookupIndices)
- .mapToObj(record::getField)
+ .mapToObj(projected::getField)
.toArray());
List<Row> list = mapping.get(key);
if (list != null) {
- list.add(record);
+ list.add(projected);
} else {
list = new ArrayList<>();
- list.add(record);
+ list.add(projected);
mapping.put(key, list);
}
});
@@ -1473,6 +1482,7 @@ public final class TestValuesTableFactory
@Override
public DynamicTableSource copy() {
return new TestValuesScanLookupTableSource(
+ originType,
producedDataType,
changelogMode,
bounded,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java
new file mode 100644
index 00000000000..7850bda83bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Test rule {@link ProjectSnapshotTransposeRule}. */
+@RunWith(Parameterized.class)
+public class ProjectSnapshotTransposeRuleTest extends TableTestBase {
+
+ private static final String STREAM = "stream";
+ private static final String BATCH = "batch";
+
+ @Parameterized.Parameter public String mode;
+
+ @Parameterized.Parameters(name = "mode = {0}")
+ public static Collection<String> parameters() {
+ return Arrays.asList(STREAM, BATCH);
+ }
+
+ private TableTestUtil util;
+
+ @Before
+ public void setup() {
+ boolean isStreaming = STREAM.equals(mode);
+ if (isStreaming) {
+ util = streamTestUtil(TableConfig.getDefault());
+ ((StreamTableTestUtil) util).buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());
+ } else {
+ util = batchTestUtil(TableConfig.getDefault());
+ ((BatchTableTestUtil) util).buildBatchProgram(FlinkBatchProgram.LOGICAL_REWRITE());
+ }
+
+ TableEnvironment tEnv = util.getTableEnv();
+ String src =
+ String.format(
+ "CREATE TABLE MyTable (\n"
+ + " a int,\n"
+ + " b varchar,\n"
+ + " c bigint,\n"
+ + " proctime as PROCTIME(),\n"
+ + " rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n"
+ + " watermark for rowtime as rowtime - INTERVAL '1' second \n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s')",
+ !isStreaming);
+ String lookup =
+ String.format(
+ "CREATE TABLE LookupTable (\n"
+ + " id int,\n"
+ + " name varchar,\n"
+ + " age int \n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s')",
+ !isStreaming);
+ tEnv.executeSql(src);
+ tEnv.executeSql(lookup);
+ }
+
+ @Test
+ public void testJoinTemporalTableWithProjectionPushDown() {
+ String sql =
+ "SELECT T.*, D.id\n"
+ + "FROM MyTable AS T\n"
+ + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ + "ON T.a = D.id";
+
+ util.verifyRelPlan(sql);
+ }
+
+ @Test
+ public void testJoinTemporalTableNotProjectable() {
+ String sql =
+ "SELECT T.*, D.*\n"
+ + "FROM MyTable AS T\n"
+ + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ + "ON T.a = D.id";
+
+ util.verifyRelPlan(sql);
+ }
+
+ @Test
+ public void testJoinTemporalTableWithReorderedProject() {
+ String sql =
+ "SELECT T.*, D.age, D.name, D.id\n"
+ + "FROM MyTable AS T\n"
+ + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ + "ON T.a = D.id";
+
+ util.verifyRelPlan(sql);
+ }
+
+ @Test
+ public void testJoinTemporalTableWithProjectAndFilter() {
+ String sql =
+ "SELECT T.*, D.id\n"
+ + "FROM MyTable AS T\n"
+ + "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ + "ON T.a = D.id WHERE D.age > 20";
+
+ util.verifyRelPlan(sql);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
index aa6765410e3..3235613b0cd 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
@@ -475,7 +475,7 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
: +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalCalc(select=[id], where=[>(age, 10)])
- +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index c3012c79471..cd66b5d1773 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -246,9 +246,18 @@
"bounded" : "false"
}
}
- }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ] ],
+ "producedType" : "ROW<`id` INT> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`id` INT> NOT NULL"
+ } ]
},
- "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647), `age` INT> NOT NULL"
+ "outputType" : "ROW<`id` INT> NOT NULL"
},
"lookupKeys" : {
"0" : {
@@ -256,11 +265,7 @@
"index" : 0
}
},
- "projectionOnTemporalTable" : [ {
- "kind" : "INPUT_REF",
- "inputIndex" : 0,
- "type" : "INT"
- } ],
+ "projectionOnTemporalTable" : null,
"filterOnTemporalTable" : null,
"inputProperties" : [ {
"requiredDistribution" : {
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml
new file mode 100644
index 00000000000..58d680fb561
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectSnapshotTransposeRuleTest.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testJoinTemporalTableNotProjectable[mode = batch]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.*
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithReorderedProject[mode = stream]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.age, D.name, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], age=[$7], name=[$6], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, b, c, proctime, rowtime, age, name, id])
++- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+ : +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+ : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
+ +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableNotProjectable[mode = stream]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.*
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+: +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithProjectAndFilter[mode = batch]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id WHERE D.age > 20]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalFilter(condition=[>($7, 20)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[id], where=[>(age, 20)])
+ +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithProjectAndFilter[mode = stream]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id WHERE D.age > 20]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalFilter(condition=[>($7, 20)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+: +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalCalc(select=[id], where=[>(age, 20)])
+ +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithProjectionPushDown[mode = batch]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id], metadata=[]]], fields=[id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithProjectionPushDown[mode = stream]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+: +- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
++- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id], metadata=[]]], fields=[id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinTemporalTableWithReorderedProject[mode = batch]">
+ <Resource name="sql">
+ <![CDATA[SELECT T.*, D.age, D.name, D.id
+FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], age=[$7], name=[$6], id=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME($2))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, b, c, proctime, rowtime, age, name, id])
++- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- FlinkLogicalCalc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+ : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
+ +- FlinkLogicalSnapshot(period=[$cor0.proctime])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
index 62a3747210c..472a6399c60 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
@@ -216,7 +216,7 @@ Calc(select=[currency, currency0, rate1])
+- Exchange(distribution=[hash[currency]])
+- Calc(select=[currency, rate1, rowtime], where=[(rate < 100)])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
- +- Calc(select=[currency, rate, (rate + 1) AS rate1, PROCTIME() AS proctime, rowtime])
+ +- Calc(select=[currency, rate, (rate + 1) AS rate1, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithComputedColumn]], fields=[currency, rate, rowtime])
]]>
</Resource>
@@ -501,7 +501,7 @@ Calc(select=[currency, currency0, rate1])
: +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])
+- Exchange(distribution=[hash[currency]])
+- Calc(select=[currency, (rate + 1) AS rate1], where=[(rate < 100)])
- +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithoutWatermark, filter=[]]], fields=[currency, rate, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithoutWatermark, project=[currency, rate], metadata=[], filter=[]]], fields=[currency, rate])
]]>
</Resource>
</TestCase>
@@ -535,7 +535,7 @@ Calc(select=[currency, currency0, rate1])
+- Exchange(distribution=[hash[currency]])
+- Calc(select=[currency, rate1], where=[(rate < 100)])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
- +- Calc(select=[currency, rate, (rate + 1) AS rate1, PROCTIME() AS proctime, rowtime])
+ +- Calc(select=[currency, rate, (rate + 1) AS rate1, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, RatesBinlogWithComputedColumn]], fields=[currency, rate, rowtime])
]]>
</Resource>