You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:18 UTC
[flink] 04/13: [FLINK-14338][table-planner][table-planner-blink]
Remove usage of TableScanRule and use new TableScanFactory extension
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac972cd78da4ef1b972dbd2b9afc39e774a2f6fe
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 19:27:40 2020 +0800
[FLINK-14338][table-planner][table-planner-blink] Remove usage of TableScanRule and use new TableScanFactory extension
* This change was introduced in CALCITE-3769
---
.../catalog/QueryOperationCatalogViewTable.java | 7 ++-
.../table/planner/delegation/PlannerContext.java | 40 +++--------------
.../planner/plan/rules/FlinkBatchRuleSets.scala | 4 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +-
.../planner/plan/schema/CatalogSourceTable.scala | 14 +-----
.../table/planner/plan/batch/sql/TableScanTest.xml | 50 ++++++++--------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 11 +++--
7 files changed, 43 insertions(+), 87 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 37bacb3..f184be0 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
@@ -68,7 +69,11 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable {
@Override
public RelNode convertToRel(RelOptTable.ToRelContext context) {
- FlinkRelBuilder relBuilder = FlinkRelBuilder.of(context.getCluster(), this.getRelOptSchema());
+ FlinkRelBuilder relBuilder = new FlinkRelBuilder(
+ // Sets up the view expander.
+ Contexts.of(context, context.getCluster().getPlanner().getContext()),
+ context.getCluster(),
+ this.getRelOptSchema());
return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 20823ea..23372bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.planner.codegen.ExpressionReducer;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
-import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
@@ -54,11 +53,8 @@ import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
@@ -159,41 +155,20 @@ public class PlannerContext {
* @return configured rel builder
*/
public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) {
- FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase);
+ FlinkCalciteCatalogReader relOptSchema = createCatalogReader(
+ false,
+ currentCatalog,
+ currentDatabase);
- Context chain = Contexts.chain(
+ Context chain = Contexts.of(
context,
- // We need to overwrite the default scan factory, which does not
- // expand views. The expandingScanFactory uses the FlinkPlanner to translate a view
- // into a rel tree, before applying any subsequent rules.
- Contexts.of(expandingScanFactory(
- createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext()))
+ // Sets up the ViewExpander explicitly for FlinkRelBuilder.
+ createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext()
);
return new FlinkRelBuilder(chain, cluster, relOptSchema);
}
/**
- * Creates a {@link RelFactories.TableScanFactory} that uses a
- * {@link org.apache.calcite.plan.RelOptTable.ViewExpander} to handle
- * {@link ExpandingPreparingTable} instances, and falls back to a default
- * factory for other tables.
- *
- * @param viewExpander View expander
- * @return Table scan factory
- */
- private static RelFactories.TableScanFactory expandingScanFactory(
- RelOptTable.ViewExpander viewExpander) {
- return (cluster, table) -> {
- if (table instanceof ExpandingPreparingTable) {
- final RelOptTable.ToRelContext toRelContext =
- ViewExpanders.toRelContext(viewExpander, cluster);
- return table.toRel(toRelContext);
- }
- return RelFactories.DEFAULT_TABLE_SCAN_FACTORY.createScan(cluster, table);
- };
- }
-
- /**
* Creates a configured {@link FlinkPlannerImpl} for a planning session.
*
* @param currentCatalog the current default catalog to look for first during planning.
@@ -293,7 +268,6 @@ public class PlannerContext {
return JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet(
() -> SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false)
- .withConvertTableAccess(true)
.withInSubQueryThreshold(Integer.MAX_VALUE)
.withExpand(false)
.withRelBuilderFactory(FlinkRelFactories.FLINK_REL_BUILDER())
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 104c8f3..77b5fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -55,8 +55,7 @@ object FlinkBatchRuleSets {
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
- LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
- TableScanRule.INSTANCE)
+ LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER)
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
EnumerableToLogicalTableScan.INSTANCE)
@@ -65,7 +64,6 @@ object FlinkBatchRuleSets {
* Convert table references before query decorrelation.
*/
val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
- TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE
)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 03e294e..83d6a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -56,8 +56,7 @@ object FlinkStreamRuleSets {
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
- LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
- TableScanRule.INSTANCE)
+ LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE)
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
EnumerableToLogicalTableScan.INSTANCE)
@@ -66,7 +65,6 @@ object FlinkStreamRuleSets {
* Convert table references before query decorrelation.
*/
val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
- TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE
)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 72764fe..094de36 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -25,14 +25,13 @@ import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, T
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
-import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
+
import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.flink.table.types.logical.{TimestampKind, TimestampType}
-import java.util
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -64,15 +63,6 @@ class CatalogSourceTable[T](
.toMap
}
- override def getQualifiedName: JList[String] = {
- // Do not explain source, we already have full names, table source should be created in toRel.
- val ret = new util.ArrayList[String](names)
- // Add class name to distinguish TableSourceTable.
- val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
- ret.add(s"catalog_source: [$name]")
- ret
- }
-
override def toRel(context: RelOptTable.ToRelContext): RelNode = {
val cluster = context.getCluster
val flinkContext = cluster
@@ -104,7 +94,7 @@ class CatalogSourceTable[T](
.toArray
// Copy this table with physical scan row type.
val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
- val scan = LogicalTableScan.create(cluster, newRelTable)
+ val scan = LogicalTableScan.create(cluster, newRelTable, context.getTableHints)
val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
relBuilder.push(scan)
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 63bea00..1c75dff 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -19,7 +19,6 @@ limitations under the License.
<TestCase name="testDDLTableScan">
<Resource name="sql">
<![CDATA[SELECT * FROM src WHERE a > 1]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
@@ -27,80 +26,66 @@ LogicalProject(ts=[$0], a=[$1], b=[$2])
+- LogicalFilter(condition=[>($1, 1)])
+- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[ts, a, b], where=[>(a, 1)])
+- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
]]>
-
</Resource>
</TestCase>
- <TestCase name="testTableSourceScan">
+ <TestCase name="testDDLWithComputedColumn">
<Resource name="sql">
- <![CDATA[SELECT * FROM MyTable]]>
-
+ <![CDATA[SELECT * FROM computed_column_t]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
++- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
]]>
-
</Resource>
</TestCase>
-
<TestCase name="testDDLWithWatermarkComputedColumn">
<Resource name="sql">
<![CDATA[SELECT * FROM c_watermark_t]]>
-
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
+- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
+- TableSourceScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
]]>
-
</Resource>
</TestCase>
- <TestCase name="testDDLWithComputedColumn">
- <Resource name="sql">
- <![CDATA[SELECT * FROM computed_column_t]]>
-
- </Resource>
+ <TestCase name="testTableApiScanWithComputedColumn">
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
+- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]])
]]>
-
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
+- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
]]>
-
</Resource>
</TestCase>
<TestCase name="testTableApiScanWithDDL">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -109,11 +94,10 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
]]>
</Resource>
</TestCase>
-
<TestCase name="testTableApiScanWithTemporaryTable">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -122,11 +106,11 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
]]>
</Resource>
</TestCase>
-
<TestCase name="testTableApiScanWithWatermark">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -136,17 +120,19 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
]]>
</Resource>
</TestCase>
-
- <TestCase name="testTableApiScanWithComputedColumn">
+ <TestCase name="testTableSourceScan">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable]]>
+ </Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
-+- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 534abe4..df61b48 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -27,6 +27,8 @@ import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
import org.apache.flink.table.plan.rules.logical.{ExtendedAggregateExtractProjectRule, _}
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalProject}
+
object FlinkRuleSets {
/**
@@ -42,8 +44,7 @@ object FlinkRuleSets {
* can create new plan nodes.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
- LogicalCorrelateToTemporalTableJoinRule.INSTANCE,
- TableScanRule.INSTANCE)
+ LogicalCorrelateToTemporalTableJoinRule.INSTANCE)
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
EnumerableToLogicalTableScan.INSTANCE)
@@ -69,7 +70,11 @@ object FlinkRuleSets {
FilterProjectTransposeRule.INSTANCE,
// push a projection to the children of a join
// push all expressions to handle the time indicator correctly
- new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
+ new ProjectJoinTransposeRule(
+ classOf[LogicalProject],
+ classOf[LogicalJoin],
+ PushProjector.ExprCondition.FALSE,
+ RelFactories.LOGICAL_BUILDER),
// merge projections
ProjectMergeRule.INSTANCE,
// remove identity project