You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/05/14 14:27:10 UTC
[flink] branch master updated: [hotfix][table-planner] Extracted
creation & configuration of FrameworkConfig & RelBuilder to separate class
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e682395 [hotfix][table-planner] Extracted creation & configuration of FrameworkConfig & RelBuilder to separate class
e682395 is described below
commit e682395ae4e13caba0e2fdd98868f69ede9f3b3e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Apr 26 11:46:01 2019 +0200
[hotfix][table-planner] Extracted creation & configuration of
FrameworkConfig & RelBuilder to separate class
Both those classes should be configured for a single planning session.
They are constructed of static properties, that do not change in a
lifecycle of TableEnvironment(e.g. TypeSystem) & dynamic (e.g. default
path). The newly introduced PlanningConfigurationBuilder class helps to split
those two sets of properties.
---
.../planner/PlanningConfigurationBuilder.java | 202 +++++++++++++++++++++
.../org/apache/flink/table/api/TableEnvImpl.scala | 115 ++----------
.../flink/table/calcite/FlinkRelBuilder.scala | 24 ---
3 files changed, 219 insertions(+), 122 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
new file mode 100644
index 0000000..a838c06
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.FlinkRelBuilder;
+import org.apache.flink.table.calcite.FlinkRelBuilderFactory;
+import org.apache.flink.table.calcite.FlinkRelOptClusterFactory;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.codegen.ExpressionReducer;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.TableOperationConverter;
+import org.apache.flink.table.plan.cost.DataSetCostFactory;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
+import org.apache.flink.table.validate.FunctionCatalog;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCostFactory;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+
+import java.util.List;
+
+/**
+ * Utility class to create {@link org.apache.calcite.tools.RelBuilder} or {@link FrameworkConfig} used to create
+ * a corresponding {@link org.apache.calcite.tools.Planner}. It tries to separate static elements in a
+ * {@link org.apache.flink.table.api.TableEnvironment} like: root schema, cost factory, type system etc.
+ * from a dynamic properties like e.g. default path to look for objects in the schema.
+ */
+@Internal
+public class PlanningConfigurationBuilder {
+ private static final RelOptCostFactory COST_FACTORY = new DataSetCostFactory();
+ private static final RelDataTypeSystem TYPE_SYSTEM = new FlinkTypeSystem();
+ private static final FlinkTypeFactory TYPE_FACTORY = new FlinkTypeFactory(TYPE_SYSTEM);
+ private final RelOptPlanner planner;
+ private final ExpressionBridge<PlannerExpression> expressionBridge;
+ private final Context context;
+ private final TableConfig tableConfig;
+ private final FunctionCatalog functionCatalog;
+ private CalciteSchema rootSchema;
+
+ public PlanningConfigurationBuilder(
+ TableConfig tableConfig,
+ FunctionCatalog functionCatalog,
+ CalciteSchema rootSchema,
+ ExpressionBridge<PlannerExpression> expressionBridge) {
+ this.tableConfig = tableConfig;
+ this.functionCatalog = functionCatalog;
+
+ // create context instances with Flink type factory
+ this.context = Contexts.of(
+ new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
+ );
+
+ this.planner = new VolcanoPlanner(COST_FACTORY, context);
+ planner.setExecutor(new ExpressionReducer(tableConfig));
+ planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+
+ this.expressionBridge = expressionBridge;
+
+ this.rootSchema = rootSchema;
+ }
+
+ /**
+ * Creates a configured {@link FlinkRelBuilder} for a planning session.
+ *
+ * @param defaultSchema the default schema to look for first during planning.
+ * @return configured rel builder
+ */
+ public FlinkRelBuilder createRelBuilder(List<String> defaultSchema) {
+ RelOptCluster cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(TYPE_FACTORY));
+ RelOptSchema relOptSchema = new CalciteCatalogReader(
+ rootSchema,
+ defaultSchema,
+ TYPE_FACTORY,
+ CalciteConfig.connectionConfig(getSqlParserConfig(calciteConfig(tableConfig))));
+
+ return new FlinkRelBuilder(context, cluster, relOptSchema, expressionBridge);
+ }
+
+ /** Returns the Calcite {@link org.apache.calcite.plan.RelOptPlanner} that will be used. */
+ public RelOptPlanner getPlanner() {
+ return planner;
+ }
+
+ /** Returns the {@link FlinkTypeFactory} that will be used. */
+ public FlinkTypeFactory getTypeFactory() {
+ return TYPE_FACTORY;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ /**
+ * Creates a configured {@link FrameworkConfig} for a planning session.
+ *
+ * @param defaultSchema the default schema to look for first during planning
+ * @return configured framework config
+ */
+ public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
+ return Frameworks
+ .newConfigBuilder()
+ .defaultSchema(defaultSchema)
+ .parserConfig(getSqlParserConfig(calciteConfig(tableConfig)))
+ .costFactory(COST_FACTORY)
+ .typeSystem(TYPE_SYSTEM)
+ .operatorTable(getSqlOperatorTable(calciteConfig(tableConfig), functionCatalog))
+ .sqlToRelConverterConfig(getSqlToRelConverterConfig(calciteConfig(tableConfig), expressionBridge))
+ // the converter is needed when calling temporal table functions from SQL, because
+ // they reference a history table represented with a tree of table operations
+ .context(context)
+ // set the executor to evaluate constant expressions
+ .executor(new ExpressionReducer(tableConfig))
+ .build();
+ }
+
+ private CalciteConfig calciteConfig(TableConfig tableConfig) {
+ return tableConfig.getPlannerConfig()
+ .unwrap(CalciteConfig.class)
+ .orElseGet(CalciteConfig::DEFAULT);
+ }
+
+ /**
+ * Returns the {@link SqlToRelConverter} config.
+ */
+ private SqlToRelConverter.Config getSqlToRelConverterConfig(
+ CalciteConfig calciteConfig,
+ ExpressionBridge<PlannerExpression> expressionBridge) {
+ return JavaScalaConversionUtil.toJava(calciteConfig.sqlToRelConverterConfig()).orElseGet(
+ () -> SqlToRelConverter.configBuilder()
+ .withTrimUnusedFields(false)
+ .withConvertTableAccess(false)
+ .withInSubQueryThreshold(Integer.MAX_VALUE)
+ .withRelBuilderFactory(new FlinkRelBuilderFactory(expressionBridge))
+ .build()
+ );
+ }
+
+ /**
+ * Returns the operator table for this environment including a custom Calcite configuration.
+ */
+ private SqlOperatorTable getSqlOperatorTable(CalciteConfig calciteConfig, FunctionCatalog functionCatalog) {
+ return JavaScalaConversionUtil.toJava(calciteConfig.sqlOperatorTable()).map(operatorTable -> {
+ if (calciteConfig.replacesSqlOperatorTable()) {
+ return operatorTable;
+ } else {
+ return ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable(), operatorTable);
+ }
+ }
+ ).orElseGet(functionCatalog::getSqlOperatorTable);
+ }
+
+ /**
+ * Returns the SQL parser config for this environment including a custom Calcite configuration.
+ */
+ private SqlParser.Config getSqlParserConfig(CalciteConfig calciteConfig) {
+ return JavaScalaConversionUtil.toJava(calciteConfig.sqlParserConfig()).orElseGet(() ->
+ // we use Java lex because back ticks are easier than double quotes in programming
+ // and cases are preserved
+ SqlParser
+ .configBuilder()
+ .setLex(Lex.JAVA)
+ .build());
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
index df5936f..aed75f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
@@ -22,7 +22,6 @@ import _root_.java.lang.reflect.Modifier
import _root_.java.util.concurrent.atomic.AtomicInteger
import com.google.common.collect.ImmutableList
-import org.apache.calcite.config.Lex
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan._
@@ -32,9 +31,6 @@ import org.apache.calcite.schema
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql._
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.sql2rel.SqlToRelConverter
import org.apache.calcite.tools._
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -43,16 +39,15 @@ import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.table.calcite._
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction}
import org.apache.flink.table.operations.{CatalogTableOperation, OperationTreeBuilder, PlannerTableOperation}
-import org.apache.flink.table.plan.TableOperationConverter
-import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
+import org.apache.flink.table.planner.PlanningConfigurationBuilder
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -77,37 +72,10 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
// Table API/SQL function catalog
private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog()
- // the configuration to create a Calcite planner
- private lazy val frameworkConfig: FrameworkConfig = Frameworks
- .newConfigBuilder
- .defaultSchema(rootSchema)
- .parserConfig(getSqlParserConfig)
- .costFactory(new DataSetCostFactory)
- .typeSystem(new FlinkTypeSystem)
- .operatorTable(getSqlOperatorTable)
- .sqlToRelConverterConfig(getSqlToRelConverterConfig)
- // the converter is needed when calling temporal table functions from SQL, because
- // they reference a history table represented with a tree of table operations
- .context(Contexts.of(
- new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
- ))
- // set the executor to evaluate constant expressions
- .executor(new ExpressionReducer(config))
- .build
-
// temporary bridge between API and planner
private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](functionCatalog, PlannerExpressionConverter.INSTANCE)
- // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
- protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder
- .create(frameworkConfig, expressionBridge)
-
- // the planner instance used to optimize queries of this TableEnvironment
- private lazy val planner: RelOptPlanner = relBuilder.getPlanner
-
- private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
-
// a counter for unique attribute names
private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
@@ -116,6 +84,12 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
private[flink] val operationTreeBuilder = new OperationTreeBuilder(this)
+ private val planningSession: PlanningConfigurationBuilder = new PlanningConfigurationBuilder(
+ config,
+ functionCatalog,
+ internalSchema,
+ expressionBridge)
+
protected def calciteConfig: CalciteConfig = config.getPlannerConfig
.unwrap(classOf[CalciteConfig])
.orElse(CalciteConfig.DEFAULT)
@@ -129,42 +103,6 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
/**
- * Returns the SqlToRelConverter config.
- */
- protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
- calciteConfig.sqlToRelConverterConfig match {
-
- case None =>
- SqlToRelConverter.configBuilder()
- .withTrimUnusedFields(false)
- .withConvertTableAccess(false)
- .withInSubQueryThreshold(Integer.MAX_VALUE)
- .withRelBuilderFactory(new FlinkRelBuilderFactory(expressionBridge))
- .build()
-
- case Some(c) => c
- }
- }
-
- /**
- * Returns the operator table for this environment including a custom Calcite configuration.
- */
- protected def getSqlOperatorTable: SqlOperatorTable = {
- calciteConfig.sqlOperatorTable match {
-
- case None =>
- functionCatalog.getSqlOperatorTable
-
- case Some(table) =>
- if (calciteConfig.replacesSqlOperatorTable) {
- table
- } else {
- ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
- }
- }
- }
-
- /**
* Returns the normalization rule set for this environment
* including a custom RuleSet configuration.
*/
@@ -222,25 +160,6 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
/**
- * Returns the SQL parser config for this environment including a custom Calcite configuration.
- */
- protected def getSqlParserConfig: SqlParser.Config = {
- calciteConfig.sqlParserConfig match {
-
- case None =>
- // we use Java lex because back ticks are easier than double quotes in programming
- // and cases are preserved
- SqlParser
- .configBuilder()
- .setLex(Lex.JAVA)
- .build()
-
- case Some(sqlParserConfig) =>
- sqlParserConfig
- }
- }
-
- /**
* Returns the built-in normalization rules that are defined by the environment.
*/
protected def getBuiltInNormRuleSet: RuleSet
@@ -355,7 +274,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
input: RelNode,
targetTraits: RelTraitSet): RelNode = {
- val planner = new HepPlanner(hepProgram, frameworkConfig.getContext)
+ val planner = new HepPlanner(hepProgram, planningSession.getContext)
planner.setRoot(input)
if (input.getTraitSet != targetTraits) {
planner.changeTraits(input, targetTraits.simplify)
@@ -424,7 +343,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
functionCatalog.registerScalarFunction(
name,
function,
- typeFactory)
+ planningSession.getTypeFactory)
}
/**
@@ -448,7 +367,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
name,
function,
typeInfo,
- typeFactory)
+ planningSession.getTypeFactory)
}
/**
@@ -475,7 +394,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
function,
resultTypeInfo,
accTypeInfo,
- typeFactory)
+ planningSession.getTypeFactory)
}
override def registerTable(name: String, table: Table): Unit = {
@@ -554,7 +473,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
private def extractTableSchema(table: schema.Table): TableSchema = {
- val relDataType = table.getRowType(typeFactory)
+ val relDataType = table.getRowType(planningSession.getTypeFactory)
val fieldNames = relDataType.getFieldNames
val fieldTypes = relDataType.getFieldList.asScala
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
@@ -779,17 +698,17 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
/** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: FlinkRelBuilder = {
- relBuilder
+ planningSession.createRelBuilder(List().asJava)
}
/** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
private[flink] def getPlanner: RelOptPlanner = {
- planner
+ planningSession.getPlanner
}
/** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
private[flink] def getTypeFactory: FlinkTypeFactory = {
- typeFactory
+ planningSession.getTypeFactory
}
private[flink] def getFunctionCatalog: FunctionCatalog = {
@@ -798,7 +717,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
/** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
private[flink] def getFrameworkConfig: FrameworkConfig = {
- frameworkConfig
+ planningSession.createFrameworkConfig(rootSchema)
}
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 5a8678a..b6412d4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -105,30 +105,6 @@ class FlinkRelBuilder(
object FlinkRelBuilder {
- def create(
- config: FrameworkConfig,
- expressionBridge: ExpressionBridge[PlannerExpression])
- : FlinkRelBuilder = {
-
- // create Flink type factory
- val typeSystem = config.getTypeSystem
- val typeFactory = new FlinkTypeFactory(typeSystem)
-
- // create context instances with Flink type factory
- val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
- planner.setExecutor(config.getExecutor)
- planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
- val cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory))
- val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
- val relOptSchema = new CalciteCatalogReader(
- calciteSchema,
- Collections.emptyList(),
- typeFactory,
- CalciteConfig.connectionConfig(config.getParserConfig))
-
- new FlinkRelBuilder(config.getContext, cluster, relOptSchema, expressionBridge)
- }
-
/**
* Information necessary to create a window aggregate.
*