You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:18 UTC

[flink] 04/13: [FLINK-14338][table-planner][table-planner-blink] Remove usage of TableScanRule and use new TableScanFactory extension

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac972cd78da4ef1b972dbd2b9afc39e774a2f6fe
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 19:27:40 2020 +0800

    [FLINK-14338][table-planner][table-planner-blink] Remove usage of TableScanRule and use new TableScanFactory extension
    
    * This change was introduced in CALCITE-3769
---
 .../catalog/QueryOperationCatalogViewTable.java    |  7 ++-
 .../table/planner/delegation/PlannerContext.java   | 40 +++--------------
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  4 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  4 +-
 .../planner/plan/schema/CatalogSourceTable.scala   | 14 +-----
 .../table/planner/plan/batch/sql/TableScanTest.xml | 50 ++++++++--------------
 .../flink/table/plan/rules/FlinkRuleSets.scala     | 11 +++--
 7 files changed, 43 insertions(+), 87 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 37bacb3..f184be0 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
+import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -68,7 +69,11 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable {
 
 	@Override
 	public RelNode convertToRel(RelOptTable.ToRelContext context) {
-		FlinkRelBuilder relBuilder = FlinkRelBuilder.of(context.getCluster(), this.getRelOptSchema());
+		FlinkRelBuilder relBuilder = new FlinkRelBuilder(
+				// Sets up the view expander.
+				Contexts.of(context, context.getCluster().getPlanner().getContext()),
+				context.getCluster(),
+				this.getRelOptSchema());
 
 		return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 20823ea..23372bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.planner.codegen.ExpressionReducer;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
-import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 
@@ -54,11 +53,8 @@ import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.ViewExpanders;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
@@ -159,41 +155,20 @@ public class PlannerContext {
 	 * @return configured rel builder
 	 */
 	public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) {
-		FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase);
+		FlinkCalciteCatalogReader relOptSchema = createCatalogReader(
+				false,
+				currentCatalog,
+				currentDatabase);
 
-		Context chain = Contexts.chain(
+		Context chain = Contexts.of(
 			context,
-			// We need to overwrite the default scan factory, which does not
-			// expand views. The expandingScanFactory uses the FlinkPlanner to translate a view
-			// into a rel tree, before applying any subsequent rules.
-			Contexts.of(expandingScanFactory(
-					createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext()))
+			// Sets up the ViewExpander explicitly for FlinkRelBuilder.
+			createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext()
 		);
 		return new FlinkRelBuilder(chain, cluster, relOptSchema);
 	}
 
 	/**
-	 * Creates a {@link RelFactories.TableScanFactory} that uses a
-	 * {@link org.apache.calcite.plan.RelOptTable.ViewExpander} to handle
-	 * {@link ExpandingPreparingTable} instances, and falls back to a default
-	 * factory for other tables.
-	 *
-	 * @param viewExpander View expander
-	 * @return Table scan factory
-	 */
-	private static RelFactories.TableScanFactory expandingScanFactory(
-			RelOptTable.ViewExpander viewExpander) {
-		return (cluster, table) -> {
-			if (table instanceof ExpandingPreparingTable) {
-				final RelOptTable.ToRelContext toRelContext =
-					ViewExpanders.toRelContext(viewExpander, cluster);
-				return table.toRel(toRelContext);
-			}
-			return RelFactories.DEFAULT_TABLE_SCAN_FACTORY.createScan(cluster, table);
-		};
-	}
-
-	/**
 	 * Creates a configured {@link FlinkPlannerImpl} for a planning session.
 	 *
 	 * @param currentCatalog the current default catalog to look for first during planning.
@@ -293,7 +268,6 @@ public class PlannerContext {
 		return JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet(
 				() -> SqlToRelConverter.configBuilder()
 						.withTrimUnusedFields(false)
-						.withConvertTableAccess(true)
 						.withInSubQueryThreshold(Integer.MAX_VALUE)
 						.withExpand(false)
 						.withRelBuilderFactory(FlinkRelFactories.FLINK_REL_BUILDER())
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 104c8f3..77b5fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -55,8 +55,7 @@ object FlinkBatchRuleSets {
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
     LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
-    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -65,7 +64,6 @@ object FlinkBatchRuleSets {
     * Convert table references before query decorrelation.
     */
   val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
     EnumerableToLogicalTableScan.INSTANCE
   )
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 03e294e..83d6a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -56,8 +56,7 @@ object FlinkStreamRuleSets {
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
     LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
     LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
-    LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -66,7 +65,6 @@ object FlinkStreamRuleSets {
     * Convert table references before query decorrelation.
     */
   val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
     EnumerableToLogicalTableScan.INSTANCE
   )
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 72764fe..094de36 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -25,14 +25,13 @@ import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, T
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
-import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
+
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.flink.table.types.logical.{TimestampKind, TimestampType}
 
-import java.util
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -64,15 +63,6 @@ class CatalogSourceTable[T](
       .toMap
   }
 
-  override def getQualifiedName: JList[String] = {
-    // Do not explain source, we already have full names, table source should be created in toRel.
-    val ret = new util.ArrayList[String](names)
-    // Add class name to distinguish TableSourceTable.
-    val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
-    ret.add(s"catalog_source: [$name]")
-    ret
-  }
-
   override def toRel(context: RelOptTable.ToRelContext): RelNode = {
     val cluster = context.getCluster
     val flinkContext = cluster
@@ -104,7 +94,7 @@ class CatalogSourceTable[T](
       .toArray
     // Copy this table with physical scan row type.
     val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
-    val scan = LogicalTableScan.create(cluster, newRelTable)
+    val scan = LogicalTableScan.create(cluster, newRelTable, context.getTableHints)
     val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
     relBuilder.push(scan)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 63bea00..1c75dff 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -19,7 +19,6 @@ limitations under the License.
   <TestCase name="testDDLTableScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM src WHERE a > 1]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
@@ -27,80 +26,66 @@ LogicalProject(ts=[$0], a=[$1], b=[$2])
 +- LogicalFilter(condition=[>($1, 1)])
    +- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[ts, a, b], where=[>(a, 1)])
 +- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
 ]]>
-
     </Resource>
   </TestCase>
-  <TestCase name="testTableSourceScan">
+  <TestCase name="testDDLWithComputedColumn">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable]]>
-
+      <![CDATA[SELECT * FROM computed_column_t]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
++- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
-
   <TestCase name="testDDLWithWatermarkComputedColumn">
     <Resource name="sql">
       <![CDATA[SELECT * FROM c_watermark_t]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 +- TableSourceScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
-  <TestCase name="testDDLWithComputedColumn">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM computed_column_t]]>
-
-    </Resource>
+  <TestCase name="testTableApiScanWithComputedColumn">
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 +- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testTableApiScanWithDDL">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -109,11 +94,10 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
 ]]>
     </Resource>
   </TestCase>
-
   <TestCase name="testTableApiScanWithTemporaryTable">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -122,11 +106,11 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
 ]]>
     </Resource>
   </TestCase>
-
   <TestCase name="testTableApiScanWithWatermark">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -136,17 +120,19 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 ]]>
     </Resource>
   </TestCase>
-
-  <TestCase name="testTableApiScanWithComputedColumn">
+  <TestCase name="testTableSourceScan">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
-+- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 534abe4..df61b48 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -27,6 +27,8 @@ import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
 import org.apache.flink.table.plan.rules.logical.{ExtendedAggregateExtractProjectRule, _}
 
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalProject}
+
 object FlinkRuleSets {
 
   /**
@@ -42,8 +44,7 @@ object FlinkRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToTemporalTableJoinRule.INSTANCE,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToTemporalTableJoinRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -69,7 +70,11 @@ object FlinkRuleSets {
     FilterProjectTransposeRule.INSTANCE,
     // push a projection to the children of a join
     // push all expressions to handle the time indicator correctly
-    new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
+    new ProjectJoinTransposeRule(
+      classOf[LogicalProject],
+      classOf[LogicalJoin],
+      PushProjector.ExprCondition.FALSE,
+      RelFactories.LOGICAL_BUILDER),
     // merge projections
     ProjectMergeRule.INSTANCE,
     // remove identity project