You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/07 06:50:46 UTC
[flink] branch release-1.15 updated: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 1dc4c5ba71e [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
1dc4c5ba71e is described below
commit 1dc4c5ba71e2c920ee94bf4274a850db6bc870d9
Author: lincoln.lil <li...@alibaba-inc.com>
AuthorDate: Thu Jun 30 23:14:16 2022 +0800
[FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
This closes #20118
(cherry picked from commit fb9843af5ffeb6d7561876704d463dea1fcdc153)
---
.../planner/connectors/DynamicSourceUtils.java | 2 +-
.../PushProjectIntoTableSourceScanRule.java | 70 ++++++++-
.../planner/plan/stream/sql/TableSourceTest.xml | 162 ++++++++++++++++++---
.../planner/plan/stream/sql/TableSourceTest.scala | 74 +++++++++-
4 files changed, 283 insertions(+), 25 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 27f93fa6ac4..95857e8d7ba 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -370,7 +370,7 @@ public final class DynamicSourceUtils {
return Collections.emptyMap();
}
- private static List<MetadataColumn> extractMetadataColumns(ResolvedSchema schema) {
+ public static List<MetadataColumn> extractMetadataColumns(ResolvedSchema schema) {
return schema.getColumns().stream()
.filter(MetadataColumn.class::isInstance)
.map(MetadataColumn.class::cast)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index 17e87a4bd45..0c48a1824cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -50,12 +50,14 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -337,11 +339,71 @@ public class PushProjectIntoTableSourceScanRule
private List<RexNode> rewriteProjections(
RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
final LogicalProject project = call.rel(0);
+ List<RexNode> newProjects = project.getProjects();
+
if (supportsProjectionPushDown(source.tableSource())) {
- return NestedProjectionUtil.rewrite(
- project.getProjects(), projectedSchema, call.builder().getRexBuilder());
- } else {
- return project.getProjects();
+ // if support project push down, then all input ref will be rewritten includes metadata
+ // columns.
+ newProjects =
+ NestedProjectionUtil.rewrite(
+ newProjects, projectedSchema, call.builder().getRexBuilder());
+ } else if (supportsMetadata(source.tableSource())) {
+ // supportsMetadataProjection only.
+ // Note: why not reuse the NestedProjectionUtil to rewrite metadata projection? because
+ // it only works for sources which support projection push down.
+ List<Column.MetadataColumn> metadataColumns =
+ DynamicSourceUtils.extractMetadataColumns(
+ source.contextResolvedTable().getResolvedSchema());
+ if (metadataColumns.size() > 0) {
+ Set<String> metaCols =
+ metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
+
+ MetadataOnlyProjectionRewriter rewriter =
+ new MetadataOnlyProjectionRewriter(
+ project.getInput().getRowType(), source.getRowType(), metaCols);
+
+ newProjects =
+ newProjects.stream()
+ .map(p -> p.accept(rewriter))
+ .collect(Collectors.toList());
+ }
+ }
+
+ return newProjects;
+ }
+
+ private static class MetadataOnlyProjectionRewriter extends RexShuttle {
+
+ private final RelDataType oldInputRowType;
+
+ private final RelDataType newInputRowType;
+
+ private final Set<String> metaCols;
+
+ public MetadataOnlyProjectionRewriter(
+ RelDataType oldInputRowType, RelDataType newInputRowType, Set<String> metaCols) {
+ this.oldInputRowType = oldInputRowType;
+ this.newInputRowType = newInputRowType;
+ this.metaCols = metaCols;
+ }
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int refIndex = inputRef.getIndex();
+ if (refIndex > oldInputRowType.getFieldCount() - 1) {
+ throw new TableException(
+ "Illegal field ref:" + refIndex + " over input row:" + oldInputRowType);
+ }
+ String refName = oldInputRowType.getFieldNames().get(refIndex);
+ if (metaCols.contains(refName)) {
+ int newIndex = newInputRowType.getFieldNames().indexOf(refName);
+ if (newIndex == -1) {
+ throw new TableException(
+ "Illegal meta field:" + refName + " over input row:" + newInputRowType);
+ }
+ return new RexInputRef(newIndex, inputRef.getType());
+ }
+ return inputRef;
}
}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index f8009ff2197..50aa7631e60 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -81,6 +81,45 @@ LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.
<![CDATA[
Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNoNestedProjectWithMetadata">
+ <Resource name="sql">
+ <![CDATA[
+SELECT id,
+ deepNested.nested1 AS nested1,
+ deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as results
+FROM T
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, deepNested.nested1 AS nested1, ((deepNested.nested1.value + deepNested.nested2.num) + metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested], metadata=[metadata_1]]], fields=[id, deepNested, metadata_1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectionPushDownOnly">
+ <Resource name="sql">
+ <![CDATA[SELECT id, ts1, tags FROM src]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], tags=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id, ts, tags], metadata=[]]], fields=[id, ts, tags])
]]>
</Resource>
</TestCase>
@@ -119,6 +158,81 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[], metadata=[]]], fields=[])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectWithoutProctime">
+ <Resource name="sql">
+ <![CDATA[select name, val, rtime, id from T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
++- Calc(select=[name, val, rtime, id])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectWithoutRowtime">
+ <Resource name="sql">
+ <![CDATA[SELECT ptime, name, val, id FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+ +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+ +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReadsComputedColumnWithoutProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, ts1, op FROM src]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, ts]]], fields=[id, name, op, ts])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReadsComputedColumnWithProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, ts1, op FROM src]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id], metadata=[op, ts]]], fields=[id, op, ts])
]]>
</Resource>
</TestCase>
@@ -153,23 +267,37 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithoutProctime">
+ <TestCase name="testReadsMetaDataWithDifferentOrder">
<Resource name="sql">
- <![CDATA[select name, val, rtime, id from T]]>
+ <![CDATA[SELECT ts, id, name, tags, op FROM src]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
- +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(ts=[$4], id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
-+- Calc(select=[name, val, rtime, id])
- +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+Calc(select=[ts, id, name, CAST(tags AS VARCHAR(2147483647)) AS tags, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, name, op, tags, ts])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReadsMetaDataWithoutProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, ts, tags FROM src]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], ts=[$4], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[id, name, tags, ts])
]]>
</Resource>
</TestCase>
@@ -192,24 +320,20 @@ Calc(select=[rowtime, id, name, val])
]]>
</Resource>
</TestCase>
- <TestCase name="testProjectWithoutRowtime">
+ <TestCase name="testReadsMetaDataWithProjectionPushDown">
<Resource name="sql">
- <![CDATA[SELECT ptime, name, val, id FROM T]]>
+ <![CDATA[SELECT id, ts, tags FROM src]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
- +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(id=[$0], ts=[$4], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
- +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
- +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id], metadata=[tags, ts]]], fields=[id, tags, ts])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 164ded36c5c..c637b0ed553 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -233,6 +233,15 @@ class TableSourceTest extends TableTestBase {
@Test
def testNestedProjectWithMetadata(): Unit = {
+ testNestedProjectWithMetadataBase(true)
+ }
+
+ @Test
+ def testNoNestedProjectWithMetadata(): Unit = {
+ testNestedProjectWithMetadataBase(false)
+ }
+
+ private def testNestedProjectWithMetadataBase(supportsNestedProjectionPushDown: Boolean): Unit = {
val ddl =
s"""
|CREATE TABLE T (
@@ -243,7 +252,7 @@ class TableSourceTest extends TableTestBase {
| metadata_2 string metadata
|) WITH (
| 'connector' = 'values',
- | 'nested-projection-supported' = 'true',
+ | 'nested-projection-supported' = '$supportsNestedProjectionPushDown',
| 'bounded' = 'true',
| 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
|)
@@ -289,4 +298,67 @@ class TableSourceTest extends TableTestBase {
|""".stripMargin
)
}
+
+ private def prepareDdlWithPushProjectAndMetaData(
+ projectionPushDown: Boolean,
+ readsMeta: Boolean): Unit = {
+ val ddl =
+ s"""
+ |CREATE TABLE src (
+ | id int,
+ | name varchar,
+ | tags varchar ${if (readsMeta) "METADATA VIRTUAL" else ""},
+ | op varchar ${if (readsMeta) "METADATA VIRTUAL" else ""},
+ | ts timestamp(3) ${if (readsMeta) "METADATA VIRTUAL" else ""},
+ | ts1 as ts + interval '10' second
+ |) WITH (
+ | 'connector' = 'values',
+ | ${if (readsMeta) "'readable-metadata'='tags:varchar,op:varchar,ts:timestamp(3)'," else ""}
+ | 'enable-projection-push-down' = '$projectionPushDown'
+ |)""".stripMargin
+
+ util.tableEnv.executeSql(ddl)
+ }
+
+ @Test
+ def testReadsMetaDataWithDifferentOrder(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(false, true)
+
+ util.verifyExecPlan("SELECT ts, id, name, tags, op FROM src")
+ }
+
+ @Test
+ def testReadsMetaDataWithoutProjectionPushDown(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(false, true)
+
+ util.verifyExecPlan("SELECT id, ts, tags FROM src")
+ }
+
+ @Test
+ def testReadsComputedColumnWithoutProjectionPushDown(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(false, true)
+
+ util.verifyExecPlan("SELECT id, ts1, op FROM src")
+ }
+
+ @Test
+ def testReadsComputedColumnWithProjectionPushDown(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(true, true)
+
+ util.verifyExecPlan("SELECT id, ts1, op FROM src")
+ }
+
+ @Test
+ def testReadsMetaDataWithProjectionPushDown(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(true, true)
+
+ util.verifyExecPlan("SELECT id, ts, tags FROM src")
+ }
+
+ @Test
+ def testProjectionPushDownOnly(): Unit = {
+ prepareDdlWithPushProjectAndMetaData(true, false)
+
+ util.verifyExecPlan("SELECT id, ts1, tags FROM src")
+ }
}