You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/31 11:45:06 UTC
[flink] 02/02: [hotfix][table-planner] Removed
TableOperationConverterSupplier.
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3934a7f3e7abce4f2ef25391bf62a5754fcfdbcf
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu May 23 11:06:49 2019 +0200
[hotfix][table-planner] Removed TableOperationConverterSupplier.
Rather than passing TableOperationConverterSupplier, we just create
FlinkRelBuilder whenever we need to convert from TableOperation to
RelNode.
---
.../flink/table/plan/TableOperationConverter.java | 26 ++++------------------
.../planner/PlanningConfigurationBuilder.java | 11 +++------
.../LogicalCorrelateToTemporalTableJoinRule.scala | 16 ++++++++-----
3 files changed, 17 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
index 2a32fe3..565bca7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
@@ -64,7 +64,6 @@ import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.tools.RelBuilder.GroupKey;
@@ -89,23 +88,7 @@ import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGA
@Internal
public class TableOperationConverter extends TableOperationDefaultVisitor<RelNode> {
- /**
- * Supplier for {@link TableOperationConverter} that can wrap given {@link RelBuilder}.
- */
- @Internal
- public static class ToRelConverterSupplier {
- private final ExpressionBridge<PlannerExpression> expressionBridge;
-
- public ToRelConverterSupplier(ExpressionBridge<PlannerExpression> expressionBridge) {
- this.expressionBridge = expressionBridge;
- }
-
- public TableOperationConverter get(RelBuilder relBuilder) {
- return new TableOperationConverter(relBuilder, expressionBridge);
- }
- }
-
- private final RelBuilder relBuilder;
+ private final FlinkRelBuilder relBuilder;
private final SingleRelVisitor singleRelVisitor = new SingleRelVisitor();
private final ExpressionBridge<PlannerExpression> expressionBridge;
private final AggregateVisitor aggregateVisitor = new AggregateVisitor();
@@ -113,7 +96,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
private final JoinExpressionVisitor joinExpressionVisitor = new JoinExpressionVisitor();
public TableOperationConverter(
- RelBuilder relBuilder,
+ FlinkRelBuilder relBuilder,
ExpressionBridge<PlannerExpression> expressionBridge) {
this.relBuilder = relBuilder;
this.expressionBridge = expressionBridge;
@@ -148,7 +131,6 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
@Override
public RelNode visitWindowAggregate(WindowAggregateTableOperation windowAggregate) {
- FlinkRelBuilder flinkRelBuilder = (FlinkRelBuilder) relBuilder;
List<AggCall> aggregations = windowAggregate.getAggregateExpressions()
.stream()
.map(this::getAggCall)
@@ -161,7 +143,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
.collect(toList());
GroupKey groupKey = relBuilder.groupKey(groupings);
LogicalWindow logicalWindow = toLogicalWindow(windowAggregate.getGroupWindow());
- return flinkRelBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
+ return relBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
}
/**
@@ -237,7 +219,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
fieldNames);
TableFunction<?> tableFunction = calculatedTable.getTableFunction();
- FlinkTypeFactory typeFactory = (FlinkTypeFactory) relBuilder.getTypeFactory();
+ FlinkTypeFactory typeFactory = relBuilder.getTypeFactory();
TableSqlFunction sqlFunction = new TableSqlFunction(
tableFunction.functionIdentifier(),
tableFunction.toString(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 7c2d79a..09a55f5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.CatalogReader;
import org.apache.flink.table.codegen.ExpressionReducer;
import org.apache.flink.table.expressions.ExpressionBridge;
import org.apache.flink.table.expressions.PlannerExpression;
-import org.apache.flink.table.plan.TableOperationConverter;
import org.apache.flink.table.plan.cost.DataSetCostFactory;
import org.apache.flink.table.util.JavaScalaConversionUtil;
import org.apache.flink.table.validate.FunctionCatalog;
@@ -84,10 +83,9 @@ public class PlanningConfigurationBuilder {
this.tableConfig = tableConfig;
this.functionCatalog = functionCatalog;
- // create context instances with Flink type factory
- this.context = Contexts.of(
- new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
- );
+ // the converter is needed when calling temporal table functions from SQL, because
+ // they reference a history table represented with a tree of table operations
+ this.context = Contexts.of(expressionBridge);
this.planner = new VolcanoPlanner(costFactory, context);
planner.setExecutor(new ExpressionReducer(tableConfig));
@@ -193,9 +191,6 @@ public class PlanningConfigurationBuilder {
getSqlToRelConverterConfig(
calciteConfig(tableConfig),
expressionBridge))
- // the converter is needed when calling temporal table functions from SQL, because
- // they reference a history table represented with a tree of table operations
- .context(context)
// set the executor to evaluate constant expressions
.executor(new ExpressionReducer(tableConfig))
.build();
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
index f4a6699..0b07f47 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.plan.TableOperationConverter
import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
import org.apache.flink.table.plan.util.RexDefaultVisitor
import org.apache.flink.util.Preconditions.checkState
+import org.apache.flink.table.calcite.FlinkRelBuilder
class LogicalCorrelateToTemporalTableJoinRule
extends RelOptRule(
@@ -82,14 +83,17 @@ class LogicalCorrelateToTemporalTableJoinRule
// If TemporalTableFunction was found, rewrite LogicalCorrelate to TemporalJoin
val underlyingHistoryTable: TableOperation = rightTemporalTableFunction
.getUnderlyingHistoryTable
- val relBuilder = this.relBuilderFactory.create(
- cluster,
- leftNode.getTable.getRelOptSchema)
val rexBuilder = cluster.getRexBuilder
- val converter = call.getPlanner.getContext
- .unwrap(classOf[TableOperationConverter.ToRelConverterSupplier]).get(relBuilder)
- val rightNode: RelNode = underlyingHistoryTable.accept(converter)
+ val expressionBridge = call.getPlanner.getContext
+ .unwrap(classOf[ExpressionBridge[PlannerExpression]])
+
+ val relBuilder = new FlinkRelBuilder(call.getPlanner.getContext,
+ cluster,
+ leftNode.getTable.getRelOptSchema,
+ expressionBridge)
+
+ val rightNode: RelNode = relBuilder.tableOperation(underlyingHistoryTable).build()
val rightTimeIndicatorExpression = createRightExpression(
rexBuilder,