You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/14 14:27:10 UTC

[flink] branch master updated: [hotfix][table-planner] Extracted creation & configuration of FrameworkConfig & RelBuilder to separate class

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e682395  [hotfix][table-planner] Extracted creation & configuration of FrameworkConfig & RelBuilder to separate class
e682395 is described below

commit e682395ae4e13caba0e2fdd98868f69ede9f3b3e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Apr 26 11:46:01 2019 +0200

    [hotfix][table-planner] Extracted creation & configuration of
    FrameworkConfig & RelBuilder to separate class
    
    Both those classes should be configured for a single planning session.
    They are constructed of static properties, that do not change in a
    lifecycle of TableEnvironment(e.g. TypeSystem) & dynamic (e.g. default
    path). The newly introduced PlanningConfigurationBuilder class helps to split
    those two sets of properties.
---
 .../planner/PlanningConfigurationBuilder.java      | 202 +++++++++++++++++++++
 .../org/apache/flink/table/api/TableEnvImpl.scala  | 115 ++----------
 .../flink/table/calcite/FlinkRelBuilder.scala      |  24 ---
 3 files changed, 219 insertions(+), 122 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
new file mode 100644
index 0000000..a838c06
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.FlinkRelBuilder;
+import org.apache.flink.table.calcite.FlinkRelBuilderFactory;
+import org.apache.flink.table.calcite.FlinkRelOptClusterFactory;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.codegen.ExpressionReducer;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.TableOperationConverter;
+import org.apache.flink.table.plan.cost.DataSetCostFactory;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
+import org.apache.flink.table.validate.FunctionCatalog;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCostFactory;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+
+import java.util.List;
+
+/**
+ * Utility class to create {@link org.apache.calcite.tools.RelBuilder} or {@link FrameworkConfig} used to create
+ * a corresponding {@link org.apache.calcite.tools.Planner}. It tries to separate static elements in a
+ * {@link org.apache.flink.table.api.TableEnvironment} like: root schema, cost factory, type system etc.
+ * from a dynamic properties like e.g. default path to look for objects in the schema.
+ */
+@Internal
+public class PlanningConfigurationBuilder {
+	private static final RelOptCostFactory COST_FACTORY = new DataSetCostFactory();
+	private static final RelDataTypeSystem TYPE_SYSTEM = new FlinkTypeSystem();
+	private static final FlinkTypeFactory TYPE_FACTORY = new FlinkTypeFactory(TYPE_SYSTEM);
+	private final RelOptPlanner planner;
+	private final ExpressionBridge<PlannerExpression> expressionBridge;
+	private final Context context;
+	private final TableConfig tableConfig;
+	private final FunctionCatalog functionCatalog;
+	private CalciteSchema rootSchema;
+
+	public PlanningConfigurationBuilder(
+			TableConfig tableConfig,
+			FunctionCatalog functionCatalog,
+			CalciteSchema rootSchema,
+			ExpressionBridge<PlannerExpression> expressionBridge) {
+		this.tableConfig = tableConfig;
+		this.functionCatalog = functionCatalog;
+
+		// create context instances with Flink type factory
+		this.context = Contexts.of(
+			new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
+		);
+
+		this.planner = new VolcanoPlanner(COST_FACTORY, context);
+		planner.setExecutor(new ExpressionReducer(tableConfig));
+		planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+
+		this.expressionBridge = expressionBridge;
+
+		this.rootSchema = rootSchema;
+	}
+
+	/**
+	 * Creates a configured {@link FlinkRelBuilder} for a planning session.
+	 *
+	 * @param defaultSchema the default schema to look for first during planning.
+	 * @return configured rel builder
+	 */
+	public FlinkRelBuilder createRelBuilder(List<String> defaultSchema) {
+		RelOptCluster cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(TYPE_FACTORY));
+		RelOptSchema relOptSchema = new CalciteCatalogReader(
+			rootSchema,
+			defaultSchema,
+			TYPE_FACTORY,
+			CalciteConfig.connectionConfig(getSqlParserConfig(calciteConfig(tableConfig))));
+
+		return new FlinkRelBuilder(context, cluster, relOptSchema, expressionBridge);
+	}
+
+	/** Returns the Calcite {@link org.apache.calcite.plan.RelOptPlanner} that will be used. */
+	public RelOptPlanner getPlanner() {
+		return planner;
+	}
+
+	/** Returns the {@link FlinkTypeFactory} that will be used. */
+	public FlinkTypeFactory getTypeFactory() {
+		return TYPE_FACTORY;
+	}
+
+	public Context getContext() {
+		return context;
+	}
+
+	/**
+	 * Creates a configured {@link FrameworkConfig} for a planning session.
+	 *
+	 * @param defaultSchema the default schema to look for first during planning
+	 * @return configured framework config
+	 */
+	public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
+		return Frameworks
+			.newConfigBuilder()
+			.defaultSchema(defaultSchema)
+			.parserConfig(getSqlParserConfig(calciteConfig(tableConfig)))
+			.costFactory(COST_FACTORY)
+			.typeSystem(TYPE_SYSTEM)
+			.operatorTable(getSqlOperatorTable(calciteConfig(tableConfig), functionCatalog))
+			.sqlToRelConverterConfig(getSqlToRelConverterConfig(calciteConfig(tableConfig), expressionBridge))
+			// the converter is needed when calling temporal table functions from SQL, because
+			// they reference a history table represented with a tree of table operations
+			.context(context)
+			// set the executor to evaluate constant expressions
+			.executor(new ExpressionReducer(tableConfig))
+			.build();
+	}
+
+	private CalciteConfig calciteConfig(TableConfig tableConfig) {
+		return tableConfig.getPlannerConfig()
+			.unwrap(CalciteConfig.class)
+			.orElseGet(CalciteConfig::DEFAULT);
+	}
+
+	/**
+	 * Returns the {@link SqlToRelConverter} config.
+	 */
+	private SqlToRelConverter.Config getSqlToRelConverterConfig(
+			CalciteConfig calciteConfig,
+			ExpressionBridge<PlannerExpression> expressionBridge) {
+		return JavaScalaConversionUtil.toJava(calciteConfig.sqlToRelConverterConfig()).orElseGet(
+			() -> SqlToRelConverter.configBuilder()
+				.withTrimUnusedFields(false)
+				.withConvertTableAccess(false)
+				.withInSubQueryThreshold(Integer.MAX_VALUE)
+				.withRelBuilderFactory(new FlinkRelBuilderFactory(expressionBridge))
+				.build()
+		);
+	}
+
+	/**
+	 * Returns the operator table for this environment including a custom Calcite configuration.
+	 */
+	private SqlOperatorTable getSqlOperatorTable(CalciteConfig calciteConfig, FunctionCatalog functionCatalog) {
+		return JavaScalaConversionUtil.toJava(calciteConfig.sqlOperatorTable()).map(operatorTable -> {
+				if (calciteConfig.replacesSqlOperatorTable()) {
+					return operatorTable;
+				} else {
+					return ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable(), operatorTable);
+				}
+			}
+		).orElseGet(functionCatalog::getSqlOperatorTable);
+	}
+
+	/**
+	 * Returns the SQL parser config for this environment including a custom Calcite configuration.
+	 */
+	private SqlParser.Config getSqlParserConfig(CalciteConfig calciteConfig) {
+		return JavaScalaConversionUtil.toJava(calciteConfig.sqlParserConfig()).orElseGet(() ->
+			// we use Java lex because back ticks are easier than double quotes in programming
+			// and cases are preserved
+			SqlParser
+				.configBuilder()
+				.setLex(Lex.JAVA)
+				.build());
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
index df5936f..aed75f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
@@ -22,7 +22,6 @@ import _root_.java.lang.reflect.Modifier
 import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan._
@@ -32,9 +31,6 @@ import org.apache.calcite.schema
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -43,16 +39,15 @@ import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.table.calcite._
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction}
 import org.apache.flink.table.operations.{CatalogTableOperation, OperationTreeBuilder, PlannerTableOperation}
-import org.apache.flink.table.plan.TableOperationConverter
-import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
+import org.apache.flink.table.planner.PlanningConfigurationBuilder
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -77,37 +72,10 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog()
 
-  // the configuration to create a Calcite planner
-  private lazy val frameworkConfig: FrameworkConfig = Frameworks
-    .newConfigBuilder
-    .defaultSchema(rootSchema)
-    .parserConfig(getSqlParserConfig)
-    .costFactory(new DataSetCostFactory)
-    .typeSystem(new FlinkTypeSystem)
-    .operatorTable(getSqlOperatorTable)
-    .sqlToRelConverterConfig(getSqlToRelConverterConfig)
-    // the converter is needed when calling temporal table functions from SQL, because
-    // they reference a history table represented with a tree of table operations
-    .context(Contexts.of(
-      new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
-    ))
-    // set the executor to evaluate constant expressions
-    .executor(new ExpressionReducer(config))
-    .build
-
   // temporary bridge between API and planner
   private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](functionCatalog, PlannerExpressionConverter.INSTANCE)
 
-  // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
-  protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder
-    .create(frameworkConfig, expressionBridge)
-
-  // the planner instance used to optimize queries of this TableEnvironment
-  private lazy val planner: RelOptPlanner = relBuilder.getPlanner
-
-  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
-
   // a counter for unique attribute names
   private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
@@ -116,6 +84,12 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
 
   private[flink] val operationTreeBuilder = new OperationTreeBuilder(this)
 
+  private val planningSession: PlanningConfigurationBuilder = new PlanningConfigurationBuilder(
+    config,
+    functionCatalog,
+    internalSchema,
+    expressionBridge)
+
   protected def calciteConfig: CalciteConfig = config.getPlannerConfig
     .unwrap(classOf[CalciteConfig])
     .orElse(CalciteConfig.DEFAULT)
@@ -129,42 +103,6 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
   }
 
   /**
-    * Returns the SqlToRelConverter config.
-    */
-  protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
-    calciteConfig.sqlToRelConverterConfig match {
-
-      case None =>
-        SqlToRelConverter.configBuilder()
-          .withTrimUnusedFields(false)
-          .withConvertTableAccess(false)
-          .withInSubQueryThreshold(Integer.MAX_VALUE)
-          .withRelBuilderFactory(new FlinkRelBuilderFactory(expressionBridge))
-          .build()
-
-      case Some(c) => c
-    }
-  }
-
-  /**
-    * Returns the operator table for this environment including a custom Calcite configuration.
-    */
-  protected def getSqlOperatorTable: SqlOperatorTable = {
-    calciteConfig.sqlOperatorTable match {
-
-      case None =>
-        functionCatalog.getSqlOperatorTable
-
-      case Some(table) =>
-        if (calciteConfig.replacesSqlOperatorTable) {
-          table
-        } else {
-          ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
-        }
-    }
-  }
-
-  /**
     * Returns the normalization rule set for this environment
     * including a custom RuleSet configuration.
     */
@@ -222,25 +160,6 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
   }
 
   /**
-    * Returns the SQL parser config for this environment including a custom Calcite configuration.
-    */
-  protected def getSqlParserConfig: SqlParser.Config = {
-    calciteConfig.sqlParserConfig match {
-
-      case None =>
-        // we use Java lex because back ticks are easier than double quotes in programming
-        // and cases are preserved
-        SqlParser
-          .configBuilder()
-          .setLex(Lex.JAVA)
-          .build()
-
-      case Some(sqlParserConfig) =>
-        sqlParserConfig
-    }
-  }
-
-  /**
     * Returns the built-in normalization rules that are defined by the environment.
     */
   protected def getBuiltInNormRuleSet: RuleSet
@@ -355,7 +274,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
     input: RelNode,
     targetTraits: RelTraitSet): RelNode = {
 
-    val planner = new HepPlanner(hepProgram, frameworkConfig.getContext)
+    val planner = new HepPlanner(hepProgram, planningSession.getContext)
     planner.setRoot(input)
     if (input.getTraitSet != targetTraits) {
       planner.changeTraits(input, targetTraits.simplify)
@@ -424,7 +343,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
     functionCatalog.registerScalarFunction(
       name,
       function,
-      typeFactory)
+      planningSession.getTypeFactory)
   }
 
   /**
@@ -448,7 +367,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
       name,
       function,
       typeInfo,
-      typeFactory)
+      planningSession.getTypeFactory)
   }
 
   /**
@@ -475,7 +394,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
       function,
       resultTypeInfo,
       accTypeInfo,
-      typeFactory)
+      planningSession.getTypeFactory)
   }
 
   override def registerTable(name: String, table: Table): Unit = {
@@ -554,7 +473,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
   }
 
   private def extractTableSchema(table: schema.Table): TableSchema = {
-    val relDataType = table.getRowType(typeFactory)
+    val relDataType = table.getRowType(planningSession.getTypeFactory)
     val fieldNames = relDataType.getFieldNames
     val fieldTypes = relDataType.getFieldList.asScala
       .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
@@ -779,17 +698,17 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
 
   /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
   private[flink] def getRelBuilder: FlinkRelBuilder = {
-    relBuilder
+    planningSession.createRelBuilder(List().asJava)
   }
 
   /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
   private[flink] def getPlanner: RelOptPlanner = {
-    planner
+    planningSession.getPlanner
   }
 
   /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
   private[flink] def getTypeFactory: FlinkTypeFactory = {
-    typeFactory
+    planningSession.getTypeFactory
   }
 
   private[flink] def getFunctionCatalog: FunctionCatalog = {
@@ -798,7 +717,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
 
   /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
   private[flink] def getFrameworkConfig: FrameworkConfig = {
-    frameworkConfig
+    planningSession.createFrameworkConfig(rootSchema)
   }
 
   /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 5a8678a..b6412d4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -105,30 +105,6 @@ class FlinkRelBuilder(
 
 object FlinkRelBuilder {
 
-  def create(
-      config: FrameworkConfig,
-      expressionBridge: ExpressionBridge[PlannerExpression])
-    : FlinkRelBuilder = {
-
-    // create Flink type factory
-    val typeSystem = config.getTypeSystem
-    val typeFactory = new FlinkTypeFactory(typeSystem)
-
-    // create context instances with Flink type factory
-    val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
-    planner.setExecutor(config.getExecutor)
-    planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
-    val cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory))
-    val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
-    val relOptSchema = new CalciteCatalogReader(
-      calciteSchema,
-      Collections.emptyList(),
-      typeFactory,
-      CalciteConfig.connectionConfig(config.getParserConfig))
-
-    new FlinkRelBuilder(config.getContext, cluster, relOptSchema, expressionBridge)
-  }
-
   /**
     * Information necessary to create a window aggregate.
     *