You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/07 06:50:46 UTC

[flink] branch release-1.15 updated: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1dc4c5ba71e [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
1dc4c5ba71e is described below

commit 1dc4c5ba71e2c920ee94bf4274a850db6bc870d9
Author: lincoln.lil <li...@alibaba-inc.com>
AuthorDate: Thu Jun 30 23:14:16 2022 +0800

    [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
    
    This closes #20118
    
    (cherry picked from commit fb9843af5ffeb6d7561876704d463dea1fcdc153)
---
 .../planner/connectors/DynamicSourceUtils.java     |   2 +-
 .../PushProjectIntoTableSourceScanRule.java        |  70 ++++++++-
 .../planner/plan/stream/sql/TableSourceTest.xml    | 162 ++++++++++++++++++---
 .../planner/plan/stream/sql/TableSourceTest.scala  |  74 +++++++++-
 4 files changed, 283 insertions(+), 25 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 27f93fa6ac4..95857e8d7ba 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -370,7 +370,7 @@ public final class DynamicSourceUtils {
         return Collections.emptyMap();
     }
 
-    private static List<MetadataColumn> extractMetadataColumns(ResolvedSchema schema) {
+    public static List<MetadataColumn> extractMetadataColumns(ResolvedSchema schema) {
         return schema.getColumns().stream()
                 .filter(MetadataColumn.class::isInstance)
                 .map(MetadataColumn.class::cast)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index 17e87a4bd45..0c48a1824cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -50,12 +50,14 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -337,11 +339,71 @@ public class PushProjectIntoTableSourceScanRule
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata
+            // columns.
+            newProjects =
+                    NestedProjectionUtil.rewrite(
+                            newProjects, projectedSchema, call.builder().getRexBuilder());
+        } else if (supportsMetadata(source.tableSource())) {
+            // supportsMetadataProjection only.
+            // Note: why not reuse the NestedProjectionUtil to rewrite metadata projection? because
+            // it only works for sources which support projection push down.
+            List<Column.MetadataColumn> metadataColumns =
+                    DynamicSourceUtils.extractMetadataColumns(
+                            source.contextResolvedTable().getResolvedSchema());
+            if (metadataColumns.size() > 0) {
+                Set<String> metaCols =
+                        metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
+
+                MetadataOnlyProjectionRewriter rewriter =
+                        new MetadataOnlyProjectionRewriter(
+                                project.getInput().getRowType(), source.getRowType(), metaCols);
+
+                newProjects =
+                        newProjects.stream()
+                                .map(p -> p.accept(rewriter))
+                                .collect(Collectors.toList());
+            }
+        }
+
+        return newProjects;
+    }
+
+    private static class MetadataOnlyProjectionRewriter extends RexShuttle {
+
+        private final RelDataType oldInputRowType;
+
+        private final RelDataType newInputRowType;
+
+        private final Set<String> metaCols;
+
+        public MetadataOnlyProjectionRewriter(
+                RelDataType oldInputRowType, RelDataType newInputRowType, Set<String> metaCols) {
+            this.oldInputRowType = oldInputRowType;
+            this.newInputRowType = newInputRowType;
+            this.metaCols = metaCols;
+        }
+
+        @Override
+        public RexNode visitInputRef(RexInputRef inputRef) {
+            int refIndex = inputRef.getIndex();
+            if (refIndex > oldInputRowType.getFieldCount() - 1) {
+                throw new TableException(
+                        "Illegal field ref:" + refIndex + " over input row:" + oldInputRowType);
+            }
+            String refName = oldInputRowType.getFieldNames().get(refIndex);
+            if (metaCols.contains(refName)) {
+                int newIndex = newInputRowType.getFieldNames().indexOf(refName);
+                if (newIndex == -1) {
+                    throw new TableException(
+                            "Illegal meta field:" + refName + " over input row:" + newInputRowType);
+                }
+                return new RexInputRef(newIndex, inputRef.getType());
+            }
+            return inputRef;
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index f8009ff2197..50aa7631e60 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -81,6 +81,45 @@ LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.
       <![CDATA[
 Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results])
 +- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNoNestedProjectWithMetadata">
+    <Resource name="sql">
+      <![CDATA[
+SELECT id,
+       deepNested.nested1 AS nested1,
+       deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as results
+FROM T
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, deepNested.nested1 AS nested1, ((deepNested.nested1.value + deepNested.nested2.num) + metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested], metadata=[metadata_1]]], fields=[id, deepNested, metadata_1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectionPushDownOnly">
+    <Resource name="sql">
+      <![CDATA[SELECT id, ts1, tags FROM src]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], tags=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id, ts, tags], metadata=[]]], fields=[id, ts, tags])
 ]]>
     </Resource>
   </TestCase>
@@ -119,6 +158,81 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- TableSourceScan(table=[[default_catalog, default_database, T, project=[], metadata=[]]], fields=[])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithoutProctime">
+    <Resource name="sql">
+      <![CDATA[select name, val, rtime, id from T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
++- Calc(select=[name, val, rtime, id])
+   +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithoutRowtime">
+    <Resource name="sql">
+      <![CDATA[SELECT ptime, name, val, id FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReadsComputedColumnWithoutProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, ts1, op FROM src]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, ts]]], fields=[id, name, op, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReadsComputedColumnWithProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, ts1, op FROM src]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], ts1=[+($4, 10000:INTERVAL SECOND)], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id], metadata=[op, ts]]], fields=[id, op, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -153,23 +267,37 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithoutProctime">
+  <TestCase name="testReadsMetaDataWithDifferentOrder">
     <Resource name="sql">
-      <![CDATA[select name, val, rtime, id from T]]>
+      <![CDATA[SELECT ts, id, name, tags, op FROM src]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
-   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(ts=[$4], id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
-+- Calc(select=[name, val, rtime, id])
-   +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+Calc(select=[ts, id, name, CAST(tags AS VARCHAR(2147483647)) AS tags, CAST(op AS VARCHAR(2147483647)) AS op])
++- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, name, op, tags, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReadsMetaDataWithoutProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, ts, tags FROM src]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], ts=[$4], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[id, name, tags, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -192,24 +320,20 @@ Calc(select=[rowtime, id, name, val])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithoutRowtime">
+  <TestCase name="testReadsMetaDataWithProjectionPushDown">
     <Resource name="sql">
-      <![CDATA[SELECT ptime, name, val, id FROM T]]>
+      <![CDATA[SELECT id, ts, tags FROM src]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
-   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(id=[$0], ts=[$4], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
-   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
-      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
++- TableSourceScan(table=[[default_catalog, default_database, src, project=[id], metadata=[tags, ts]]], fields=[id, tags, ts])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 164ded36c5c..c637b0ed553 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -233,6 +233,15 @@ class TableSourceTest extends TableTestBase {
 
   @Test
   def testNestedProjectWithMetadata(): Unit = {
+    testNestedProjectWithMetadataBase(true)
+  }
+
+  @Test
+  def testNoNestedProjectWithMetadata(): Unit = {
+    testNestedProjectWithMetadataBase(false)
+  }
+
+  private def testNestedProjectWithMetadataBase(supportsNestedProjectionPushDown: Boolean): Unit = {
     val ddl =
       s"""
          |CREATE TABLE T (
@@ -243,7 +252,7 @@ class TableSourceTest extends TableTestBase {
          |  metadata_2 string metadata
          |) WITH (
          |  'connector' = 'values',
-         |  'nested-projection-supported' = 'true',
+         |  'nested-projection-supported' = '$supportsNestedProjectionPushDown',
          |  'bounded' = 'true',
          |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
          |)
@@ -289,4 +298,67 @@ class TableSourceTest extends TableTestBase {
          |""".stripMargin
     )
   }
+
+  private def prepareDdlWithPushProjectAndMetaData(
+      projectionPushDown: Boolean,
+      readsMeta: Boolean): Unit = {
+    val ddl =
+      s"""
+         |CREATE TABLE src (
+         |  id int,
+         |  name varchar,
+         |  tags varchar ${if (readsMeta) "METADATA VIRTUAL" else ""},
+         |  op varchar ${if (readsMeta) "METADATA VIRTUAL" else ""},
+         |  ts timestamp(3) ${if (readsMeta) "METADATA VIRTUAL" else ""},
+         |  ts1 as ts + interval '10' second
+         |) WITH (
+         |  'connector' = 'values',
+         |  ${if (readsMeta) "'readable-metadata'='tags:varchar,op:varchar,ts:timestamp(3)'," else ""}
+         |  'enable-projection-push-down' = '$projectionPushDown'
+         |)""".stripMargin
+
+    util.tableEnv.executeSql(ddl)
+  }
+
+  @Test
+  def testReadsMetaDataWithDifferentOrder(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(false, true)
+
+    util.verifyExecPlan("SELECT ts, id, name, tags, op FROM src")
+  }
+
+  @Test
+  def testReadsMetaDataWithoutProjectionPushDown(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(false, true)
+
+    util.verifyExecPlan("SELECT id, ts, tags FROM src")
+  }
+
+  @Test
+  def testReadsComputedColumnWithoutProjectionPushDown(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(false, true)
+
+    util.verifyExecPlan("SELECT id, ts1, op FROM src")
+  }
+
+  @Test
+  def testReadsComputedColumnWithProjectionPushDown(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(true, true)
+
+    util.verifyExecPlan("SELECT id, ts1, op FROM src")
+  }
+
+  @Test
+  def testReadsMetaDataWithProjectionPushDown(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(true, true)
+
+    util.verifyExecPlan("SELECT id, ts, tags FROM src")
+  }
+
+  @Test
+  def testProjectionPushDownOnly(): Unit = {
+    prepareDdlWithPushProjectAndMetaData(true, false)
+
+    util.verifyExecPlan("SELECT id, ts1, tags FROM src")
+  }
 }