You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/31 11:45:06 UTC

[flink] 02/02: [hotfix][table-planner] Removed TableOperationConverterSupplier.

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3934a7f3e7abce4f2ef25391bf62a5754fcfdbcf
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu May 23 11:06:49 2019 +0200

    [hotfix][table-planner] Removed TableOperationConverterSupplier.
    
    Rather than passing TableOperationConverterSupplier, we just create
    FlinkRelBuilder whenever we need to convert from TableOperation to
    RelNode.
---
 .../flink/table/plan/TableOperationConverter.java  | 26 ++++------------------
 .../planner/PlanningConfigurationBuilder.java      | 11 +++------
 .../LogicalCorrelateToTemporalTableJoinRule.scala  | 16 ++++++++-----
 3 files changed, 17 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
index 2a32fe3..565bca7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/TableOperationConverter.java
@@ -64,7 +64,6 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilder.AggCall;
 import org.apache.calcite.tools.RelBuilder.GroupKey;
 
@@ -89,23 +88,7 @@ import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGA
 @Internal
 public class TableOperationConverter extends TableOperationDefaultVisitor<RelNode> {
 
-	/**
-	 * Supplier for {@link TableOperationConverter} that can wrap given {@link RelBuilder}.
-	 */
-	@Internal
-	public static class ToRelConverterSupplier {
-		private final ExpressionBridge<PlannerExpression> expressionBridge;
-
-		public ToRelConverterSupplier(ExpressionBridge<PlannerExpression> expressionBridge) {
-			this.expressionBridge = expressionBridge;
-		}
-
-		public TableOperationConverter get(RelBuilder relBuilder) {
-			return new TableOperationConverter(relBuilder, expressionBridge);
-		}
-	}
-
-	private final RelBuilder relBuilder;
+	private final FlinkRelBuilder relBuilder;
 	private final SingleRelVisitor singleRelVisitor = new SingleRelVisitor();
 	private final ExpressionBridge<PlannerExpression> expressionBridge;
 	private final AggregateVisitor aggregateVisitor = new AggregateVisitor();
@@ -113,7 +96,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 	private final JoinExpressionVisitor joinExpressionVisitor = new JoinExpressionVisitor();
 
 	public TableOperationConverter(
-			RelBuilder relBuilder,
+			FlinkRelBuilder relBuilder,
 			ExpressionBridge<PlannerExpression> expressionBridge) {
 		this.relBuilder = relBuilder;
 		this.expressionBridge = expressionBridge;
@@ -148,7 +131,6 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 
 		@Override
 		public RelNode visitWindowAggregate(WindowAggregateTableOperation windowAggregate) {
-			FlinkRelBuilder flinkRelBuilder = (FlinkRelBuilder) relBuilder;
 			List<AggCall> aggregations = windowAggregate.getAggregateExpressions()
 				.stream()
 				.map(this::getAggCall)
@@ -161,7 +143,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 				.collect(toList());
 			GroupKey groupKey = relBuilder.groupKey(groupings);
 			LogicalWindow logicalWindow = toLogicalWindow(windowAggregate.getGroupWindow());
-			return flinkRelBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
+			return relBuilder.windowAggregate(logicalWindow, groupKey, windowProperties, aggregations).build();
 		}
 
 		/**
@@ -237,7 +219,7 @@ public class TableOperationConverter extends TableOperationDefaultVisitor<RelNod
 				fieldNames);
 			TableFunction<?> tableFunction = calculatedTable.getTableFunction();
 
-			FlinkTypeFactory typeFactory = (FlinkTypeFactory) relBuilder.getTypeFactory();
+			FlinkTypeFactory typeFactory = relBuilder.getTypeFactory();
 			TableSqlFunction sqlFunction = new TableSqlFunction(
 				tableFunction.functionIdentifier(),
 				tableFunction.toString(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 7c2d79a..09a55f5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.CatalogReader;
 import org.apache.flink.table.codegen.ExpressionReducer;
 import org.apache.flink.table.expressions.ExpressionBridge;
 import org.apache.flink.table.expressions.PlannerExpression;
-import org.apache.flink.table.plan.TableOperationConverter;
 import org.apache.flink.table.plan.cost.DataSetCostFactory;
 import org.apache.flink.table.util.JavaScalaConversionUtil;
 import org.apache.flink.table.validate.FunctionCatalog;
@@ -84,10 +83,9 @@ public class PlanningConfigurationBuilder {
 		this.tableConfig = tableConfig;
 		this.functionCatalog = functionCatalog;
 
-		// create context instances with Flink type factory
-		this.context = Contexts.of(
-			new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
-		);
+		// the converter is needed when calling temporal table functions from SQL, because
+		// they reference a history table represented with a tree of table operations
+		this.context = Contexts.of(expressionBridge);
 
 		this.planner = new VolcanoPlanner(costFactory, context);
 		planner.setExecutor(new ExpressionReducer(tableConfig));
@@ -193,9 +191,6 @@ public class PlanningConfigurationBuilder {
 				getSqlToRelConverterConfig(
 					calciteConfig(tableConfig),
 					expressionBridge))
-			// the converter is needed when calling temporal table functions from SQL, because
-			// they reference a history table represented with a tree of table operations
-			.context(context)
 			// set the executor to evaluate constant expressions
 			.executor(new ExpressionReducer(tableConfig))
 			.build();
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
index f4a6699..0b07f47 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.plan.TableOperationConverter
 import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
 import org.apache.flink.table.plan.util.RexDefaultVisitor
 import org.apache.flink.util.Preconditions.checkState
+import org.apache.flink.table.calcite.FlinkRelBuilder
 
 class LogicalCorrelateToTemporalTableJoinRule
   extends RelOptRule(
@@ -82,14 +83,17 @@ class LogicalCorrelateToTemporalTableJoinRule
         // If TemporalTableFunction was found, rewrite LogicalCorrelate to TemporalJoin
         val underlyingHistoryTable: TableOperation = rightTemporalTableFunction
           .getUnderlyingHistoryTable
-        val relBuilder = this.relBuilderFactory.create(
-          cluster,
-          leftNode.getTable.getRelOptSchema)
         val rexBuilder = cluster.getRexBuilder
 
-        val converter = call.getPlanner.getContext
-          .unwrap(classOf[TableOperationConverter.ToRelConverterSupplier]).get(relBuilder)
-        val rightNode: RelNode = underlyingHistoryTable.accept(converter)
+        val expressionBridge = call.getPlanner.getContext
+          .unwrap(classOf[ExpressionBridge[PlannerExpression]])
+
+        val relBuilder = new FlinkRelBuilder(call.getPlanner.getContext,
+          cluster,
+          leftNode.getTable.getRelOptSchema,
+          expressionBridge)
+
+        val rightNode: RelNode = relBuilder.tableOperation(underlyingHistoryTable).build()
 
         val rightTimeIndicatorExpression = createRightExpression(
           rexBuilder,