You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/02 10:32:45 UTC
[flink] branch release-1.9 updated: [FLINK-13338][table] Make SQL
dialect configurable
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new fab290c [FLINK-13338][table] Make SQL dialect configurable
fab290c is described below
commit fab290c4890ab8f18f00bd638989bc1cec9c5f24
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 24 12:27:19 2019 +0800
[FLINK-13338][table] Make SQL dialect configurable
This closes #9212.
---
flink-python/pyflink/common/__init__.py | 4 +-
flink-python/pyflink/common/sql_dialect.py | 69 +++++++++
flink-python/pyflink/table/table_config.py | 19 ++-
.../org/apache/flink/table/api/SqlDialect.java | 47 ++++++
.../org/apache/flink/table/api/TableConfig.java | 20 +++
.../table/planner/delegation/PlannerContext.java | 61 +++++---
.../table/planner/delegation/PlannerBase.scala | 6 +-
.../table/sqlexec/SqlToOperationConverterTest.java | 170 +++++++++++++++++++++
.../expressions/utils/ExpressionTestBase.scala | 2 +-
.../planner/match/PatternTranslatorTestBase.scala | 2 +-
.../batch/sql/PartitionableSinkITCase.scala | 23 +--
.../planner/runtime/utils/BatchTestBase.scala | 10 +-
.../planner/PlanningConfigurationBuilder.java | 16 +-
.../table/sqlexec/SqlToOperationConverterTest.java | 91 +++++++++--
.../batch/sql/PartitionableSinkITCase.scala | 21 +--
15 files changed, 478 insertions(+), 83 deletions(-)
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index ca27df7..5cc9853 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -25,6 +25,7 @@ Important classes used by both Flink Streaming and Batch API:
from pyflink.common.configuration import Configuration
from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.execution_mode import ExecutionMode
+from pyflink.common.sql_dialect import SqlDialect
from pyflink.common.input_dependency_constraint import InputDependencyConstraint
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
@@ -34,5 +35,6 @@ __all__ = [
'ExecutionMode',
'InputDependencyConstraint',
'RestartStrategies',
- 'RestartStrategyConfiguration'
+ 'RestartStrategyConfiguration',
+ 'SqlDialect'
]
diff --git a/flink-python/pyflink/common/sql_dialect.py b/flink-python/pyflink/common/sql_dialect.py
new file mode 100644
index 0000000..b78e1e5
--- /dev/null
+++ b/flink-python/pyflink/common/sql_dialect.py
@@ -0,0 +1,69 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['SqlDialect']
+
+
+class SqlDialect(object):
+ """
+ Enumeration of valid SQL compatibility modes.
+
+ In most of the cases, the built-in compatibility mode should be sufficient. For some features,
+ i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to switch to the
+ Hive dialect if required.
+
+ We may introduce other SQL dialects in the future.
+
+ :data:`DEFAULT`:
+
+ Flink's default SQL behavior.
+
+ :data:`HIVE`:
+
+ SQL dialect that allows some Apache Hive specific grammar.
+
+ Note: We might never support all of the Hive grammar. See the documentation for
+ supported features.
+ """
+
+ DEFAULT = 0
+ HIVE = 1
+
+ @staticmethod
+ def _from_j_sql_dialect(j_sql_dialect):
+ gateway = get_gateway()
+ JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+ if j_sql_dialect == JSqlDialect.DEFAULT:
+ return SqlDialect.DEFAULT
+ elif j_sql_dialect == JSqlDialect.HIVE:
+ return SqlDialect.HIVE
+ else:
+ raise Exception("Unsupported Java SQL dialect: %s" % j_sql_dialect)
+
+ @staticmethod
+ def _to_j_sql_dialect(sql_dialect):
+ gateway = get_gateway()
+ JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+ if sql_dialect == SqlDialect.DEFAULT:
+ return JSqlDialect.DEFAULT
+ elif sql_dialect == SqlDialect.HIVE:
+ return JSqlDialect.HIVE
+ else:
+ raise TypeError("Unsupported SQL dialect: %s, supported SQL dialects are: "
+ "SqlDialect.DEFAULT, SqlDialect.HIVE." % sql_dialect)
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 3a84fca..42026d2 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -19,7 +19,7 @@ import sys
from py4j.compat import long
-from pyflink.common import Configuration
+from pyflink.common import Configuration, SqlDialect
from pyflink.java_gateway import get_gateway
__all__ = ['TableConfig']
@@ -247,6 +247,23 @@ class TableConfig(object):
"""
self._j_table_config.addConfiguration(configuration._j_configuration)
+ def get_sql_dialect(self):
+ """
+ Returns the current SQL dialect.
+
+ :rtype: SqlDialect
+ """
+ return SqlDialect._from_j_sql_dialect(self._j_table_config.getSqlDialect())
+
+ def set_sql_dialect(self, sql_dialect):
+ """
+ Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default.
+
+ :param sql_dialect: The given SQL dialect.
+ :type sql_dialect: SqlDialect
+ """
+ self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))
+
@staticmethod
def get_default():
"""
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
new file mode 100644
index 0000000..c6e5eb2
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Enumeration of valid SQL compatibility modes.
+ *
+ * <p>In most of the cases, the built-in compatibility mode should be sufficient. For some features,
+ * i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to switch to the Hive dialect
+ * if required.
+ *
+ * <p>We may introduce other SQL dialects in the future.
+ */
+@PublicEvolving
+public enum SqlDialect {
+
+ /**
+ * Flink's default SQL behavior.
+ */
+ DEFAULT,
+
+ /**
+ * SQL dialect that allows some Apache Hive specific grammar.
+ *
+ * <p>Note: We might never support all of the Hive grammar. See the documentation for supported
+ * features.
+ */
+ HIVE
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index be7a3f6..7dfd46f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -77,6 +77,12 @@ public class TableConfig {
private Configuration configuration = new Configuration();
/**
+ * The SQL dialect defines how to parse a SQL query. A different SQL dialect may support different
+ * SQL grammar.
+ */
+ private SqlDialect sqlDialect = SqlDialect.DEFAULT;
+
+ /**
* Returns all key/value configuration.
*/
public Configuration getConfiguration() {
@@ -94,6 +100,20 @@ public class TableConfig {
}
/**
+ * Returns the current SQL dialect.
+ */
+ public SqlDialect getSqlDialect() {
+ return this.sqlDialect;
+ }
+
+ /**
+ * Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default.
+ */
+ public void setSqlDialect(SqlDialect sqlDialect) {
+ this.sqlDialect = sqlDialect;
+ }
+
+ /**
* Returns the zone id for timestamp with local time zone.
*/
public ZoneId getLocalTimeZone() {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index df085ed..df093e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig$;
@@ -41,6 +43,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitDef;
@@ -72,8 +75,10 @@ public class PlannerContext {
private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem);
private final TableConfig tableConfig;
private final FunctionCatalog functionCatalog;
- private final FrameworkConfig frameworkConfig;
private final RelOptCluster cluster;
+ private final Context context;
+ private final CalciteSchema rootSchema;
+ private final List<RelTraitDef> traitDefs;
public PlannerContext(
TableConfig tableConfig,
@@ -82,7 +87,13 @@ public class PlannerContext {
List<RelTraitDef> traitDefs) {
this.tableConfig = tableConfig;
this.functionCatalog = functionCatalog;
- this.frameworkConfig = createFrameworkConfig(rootSchema, traitDefs);
+ this.context = new FlinkContextImpl(tableConfig, functionCatalog);
+ this.rootSchema = rootSchema;
+ this.traitDefs = traitDefs;
+ // Make a framework config to initialize the RelOptCluster instance,
+ // caution that we can only use the attributes that can not be overwrite/configured
+ // by user.
+ final FrameworkConfig frameworkConfig = createFrameworkConfig();
RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
planner.setExecutor(frameworkConfig.getExecutor());
@@ -92,19 +103,19 @@ public class PlannerContext {
this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
}
- private FrameworkConfig createFrameworkConfig(CalciteSchema rootSchema, List<RelTraitDef> traitDefs) {
+ private FrameworkConfig createFrameworkConfig() {
return Frameworks.newConfigBuilder()
- .defaultSchema(rootSchema.plus())
- .parserConfig(getSqlParserConfig())
- .costFactory(new FlinkCostFactory())
- .typeSystem(typeSystem)
- .sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
- .operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), functionCatalog))
- // set the executor to evaluate constant expressions
- .executor(new ExpressionReducer(tableConfig, false))
- .context(new FlinkContextImpl(tableConfig, functionCatalog))
- .traitDefs(traitDefs)
- .build();
+ .defaultSchema(rootSchema.plus())
+ .parserConfig(getSqlParserConfig())
+ .costFactory(new FlinkCostFactory())
+ .typeSystem(typeSystem)
+ .sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
+ .operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), functionCatalog))
+ // set the executor to evaluate constant expressions
+ .executor(new ExpressionReducer(tableConfig, false))
+ .context(context)
+ .traitDefs(traitDefs)
+ .build();
}
/** Returns the {@link FlinkTypeFactory} that will be used. */
@@ -121,7 +132,7 @@ public class PlannerContext {
*/
public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) {
FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase);
- return new FlinkRelBuilder(frameworkConfig.getContext(), cluster, relOptSchema);
+ return new FlinkRelBuilder(this.context, cluster, relOptSchema);
}
/**
@@ -133,7 +144,7 @@ public class PlannerContext {
*/
public FlinkPlannerImpl createFlinkPlanner(String currentCatalog, String currentDatabase) {
return new FlinkPlannerImpl(
- frameworkConfig,
+ createFrameworkConfig(),
isLenient -> createCatalogReader(isLenient, currentCatalog, currentDatabase),
typeFactory,
cluster);
@@ -143,7 +154,7 @@ public class PlannerContext {
boolean lenientCaseSensitivity,
String currentCatalog,
String currentDatabase) {
- SqlParser.Config sqlParserConfig = frameworkConfig.getParserConfig();
+ SqlParser.Config sqlParserConfig = getSqlParserConfig();
final boolean caseSensitive;
if (lenientCaseSensitivity) {
caseSensitive = false;
@@ -155,7 +166,7 @@ public class PlannerContext {
.setCaseSensitive(caseSensitive)
.build();
- SchemaPlus rootSchema = getRootSchema(frameworkConfig.getDefaultSchema());
+ SchemaPlus rootSchema = getRootSchema(this.rootSchema.plus());
return new FlinkCalciteCatalogReader(
CalciteSchema.from(rootSchema),
asList(
@@ -188,12 +199,24 @@ public class PlannerContext {
() -> SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.DEFAULT)
+ .setConformance(getSqlConformance())
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build());
}
+ private FlinkSqlConformance getSqlConformance() {
+ SqlDialect sqlDialect = tableConfig.getSqlDialect();
+ switch (sqlDialect) {
+ case HIVE:
+ return FlinkSqlConformance.HIVE;
+ case DEFAULT:
+ return FlinkSqlConformance.DEFAULT;
+ default:
+ throw new TableException("Unsupported SQL dialect: " + sqlDialect);
+ }
+ }
+
/**
* Returns the {@link SqlToRelConverter} config.
*
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 18922e8..9168d44 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -100,7 +100,7 @@ abstract class PlannerBase(
/** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
@VisibleForTesting
- private[flink] def getFlinkPlanner: FlinkPlannerImpl = {
+ private[flink] def createFlinkPlanner: FlinkPlannerImpl = {
val currentCatalogName = catalogManager.getCurrentCatalog
val currentDatabase = catalogManager.getCurrentDatabase
plannerContext.createFlinkPlanner(currentCatalogName, currentDatabase)
@@ -122,7 +122,7 @@ abstract class PlannerBase(
}
override def parse(stmt: String): util.List[Operation] = {
- val planner = getFlinkPlanner
+ val planner = createFlinkPlanner
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
@@ -159,7 +159,7 @@ abstract class PlannerBase(
}
override def getCompletionHints(statement: String, position: Int): Array[String] = {
- val planner = getFlinkPlanner
+ val planner = createFlinkPlanner
planner.getCompletionHints(statement, position)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
new file mode 100644
index 0000000..9368d00
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.operations.SqlConversionException;
+import org.apache.flink.table.planner.operations.SqlToOperationConverter;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for {@link org.apache.flink.table.planner.operations.SqlToOperationConverter}.
+ */
+public class SqlToOperationConverterTest {
+ private final TableConfig tableConfig = new TableConfig();
+ private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
+ "default");
+ private final CatalogManager catalogManager =
+ new CatalogManager("builtin", catalog);
+ private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ private final PlannerContext plannerContext =
+ new PlannerContext(tableConfig,
+ functionCatalog,
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
+ new ArrayList<>());
+
+ @Before
+ public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+ final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+ final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+ final TableSchema tableSchema = TableSchema.builder()
+ .field("a", DataTypes.BIGINT())
+ .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .field("c", DataTypes.INT())
+ .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("connector", "COLLECTION");
+ final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, properties, "");
+ catalog.createTable(path1, catalogTable, true);
+ catalog.createTable(path2, catalogTable, true);
+ }
+
+ @After
+ public void after() throws TableNotExistException {
+ final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+ final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+ catalog.dropTable(path1, true);
+ catalog.dropTable(path2, true);
+ }
+
+ @Test
+ public void testCreateTable() {
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c int, \n" +
+ " d varchar" +
+ ")\n" +
+ " PARTITIONED BY (a, d)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ Operation operation = parse(sql, planner);
+ assert operation instanceof CreateTableOperation;
+ CreateTableOperation op = (CreateTableOperation) operation;
+ CatalogTable catalogTable = op.getCatalogTable();
+ assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
+ assertArrayEquals(catalogTable.getSchema().getFieldNames(),
+ new String[] {"a", "b", "c", "d"});
+ assertArrayEquals(catalogTable.getSchema().getFieldDataTypes(),
+ new DataType[]{
+ DataTypes.BIGINT(),
+ DataTypes.VARCHAR(Integer.MAX_VALUE),
+ DataTypes.INT(),
+ DataTypes.VARCHAR(Integer.MAX_VALUE)});
+ }
+
+ @Test(expected = SqlConversionException.class)
+ public void testCreateTableWithPkUniqueKeys() {
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c int, \n" +
+ " d varchar, \n" +
+ " primary key(a), \n" +
+ " unique(a, b) \n" +
+ ")\n" +
+ " PARTITIONED BY (a, d)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n";
+ parse(sql, planner);
+ }
+
+ @Test
+ public void testSqlInsertWithStaticPartition() {
+ final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
+ Operation operation = parse(sql, planner);
+ assert operation instanceof CatalogSinkModifyOperation;
+ CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
+ final Map<String, String> expectedStaticPartitions = new HashMap<>();
+ expectedStaticPartitions.put("a", "1");
+ assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
+ }
+
+ private Operation parse(String sql, FlinkPlannerImpl planner) {
+ SqlNode node = planner.parse(sql);
+ return SqlToOperationConverter.convert(planner, node);
+ }
+
+ private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+ tableConfig.setSqlDialect(sqlDialect);
+ return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase());
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index eb0d928..9b7bd4a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -64,7 +64,7 @@ abstract class ExpressionTestBase {
private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
private val relBuilder = planner.getRelBuilder
- private val calcitePlanner = planner.getFlinkPlanner
+ private val calcitePlanner = planner.createFlinkPlanner
// setup test utils
private val tableName = "testTable"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 254e60f..bd5f47b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -54,7 +54,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
private val testTableRowType = RowType.of(new IntType)
private val tableName = "testTable"
private val context = prepareContext(testTableTypeInfo)
- private val calcitePlanner: FlinkPlannerImpl = context._2.getFlinkPlanner
+ private val calcitePlanner: FlinkPlannerImpl = context._2.createFlinkPlanner
private def prepareContext(typeInfo: TypeInformation[Row])
: (RelBuilder, PlannerBase, StreamExecutionEnvironment) = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 60dc55b..80b2b4e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -22,13 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableSchema, ValidationException}
-import org.apache.flink.table.planner.calcite.CalciteConfig
+import org.apache.flink.table.api.{SqlDialect, TableSchema, ValidationException}
import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -37,8 +34,6 @@ import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, Ta
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.types.Row
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
import org.junit.Assert._
import org.junit.rules.ExpectedException
import org.junit.{Before, Rule, Test}
@@ -66,24 +61,12 @@ class PartitionableSinkITCase extends BatchTestBase {
tEnv.getConfig
.getConfiguration
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
+ tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
PartitionableSinkITCase.init()
}
- override def getTableConfig: TableConfig = {
- val parserConfig = SqlParser.configBuilder
- .setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
- .setLex(Lex.JAVA)
- .setIdentifierMaxLength(256).build
- val plannerConfig = CalciteConfig.createBuilder(CalciteConfig.DEFAULT)
- .replaceSqlParserConfig(parserConfig)
- val tableConfig = new TableConfig
- tableConfig.setPlannerConfig(plannerConfig.build())
- tableConfig
- }
-
@Test
def testInsertWithOutPartitionGrouping(): Unit = {
registerTableSink()
@@ -181,7 +164,7 @@ class PartitionableSinkITCase extends BatchTestBase {
expectedEx.expect(classOf[ValidationException])
expectedEx.expectMessage("Static partition column b "
+ "should appear before dynamic partition a")
- registerTableSink(grouping = true, partitionColumns = Array("a", "b"))
+ registerTableSink(partitionColumns = Array("a", "b"))
tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
tEnv.execute("testJob")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 844eada..76c2171 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -56,7 +56,7 @@ class BatchTestBase extends BatchAbstractTestBase {
private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
- .create(settings, catalogManager = None, getTableConfig)
+ .create(settings, catalogManager = None, new TableConfig)
val tEnv: TableEnvironment = testingTableEnv
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
val env: StreamExecutionEnvironment = planner.getExecEnv
@@ -67,14 +67,6 @@ class BatchTestBase extends BatchAbstractTestBase {
val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
+ " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
- // TODO: [FLINK-13338] will expose dialect option to TableConfig to
- // avoid override CalciteConfig by users
- /**
- * Subclass should overwrite this method if we want to overwrite configuration during
- * sql parse to sql to rel conversion phrase.
- */
- protected def getTableConfig: TableConfig = new TableConfig
-
@Before
def before(): Unit = {
conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 91e0b64..1c137d1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner;
import org.apache.flink.annotation.Internal;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.calcite.FlinkRelBuilder;
@@ -155,11 +157,23 @@ public class PlanningConfigurationBuilder {
SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.DEFAULT)
+ .setConformance(getSqlConformance())
.setLex(Lex.JAVA)
.build());
}
+ private FlinkSqlConformance getSqlConformance() {
+ SqlDialect sqlDialect = tableConfig.getSqlDialect();
+ switch (sqlDialect) {
+ case HIVE:
+ return FlinkSqlConformance.HIVE;
+ case DEFAULT:
+ return FlinkSqlConformance.DEFAULT;
+ default:
+ throw new TableException("Unsupported SQL dialect: " + sqlDialect);
+ }
+ }
+
private CatalogReader createCatalogReader(
boolean lenientCaseSensitivity,
String currentCatalog,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cc78be7..fe3a405 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -18,33 +18,84 @@
package org.apache.flink.table.sqlexec;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpressionConverter;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.PlanningConfigurationBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-/** Test cases for SqlExecutableStatement. **/
+/** Test cases for {@link SqlToOperationConverter}. **/
public class SqlToOperationConverterTest {
- private static final ExecutionEnvironment streamExec =
- ExecutionEnvironment.getExecutionEnvironment();
- private static final BatchTableEnvImpl batchEnv =
- (BatchTableEnvImpl) BatchTableEnvironment.create(streamExec);
+ private final TableConfig tableConfig = new TableConfig();
+ private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
+ "default");
+ private final CatalogManager catalogManager =
+ new CatalogManager("builtin", catalog);
+ private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ private final PlanningConfigurationBuilder planningConfigurationBuilder =
+ new PlanningConfigurationBuilder(tableConfig,
+ functionCatalog,
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
+ new ExpressionBridge<>(functionCatalog,
+ PlannerExpressionConverter.INSTANCE()));
- private static final FlinkPlannerImpl planner = batchEnv.getFlinkPlanner();
+ @Before
+ public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+ final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+ final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+ final TableSchema tableSchema = TableSchema.builder()
+ .field("a", DataTypes.BIGINT())
+ .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .field("c", DataTypes.INT())
+ .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("connector", "COLLECTION");
+ final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, properties, "");
+ catalog.createTable(path1, catalogTable, true);
+ catalog.createTable(path2, catalogTable, true);
+ }
+
+ @After
+ public void after() throws TableNotExistException {
+ final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+ final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+ catalog.dropTable(path1, true);
+ catalog.dropTable(path2, true);
+ }
@Test
public void testCreateTable() {
@@ -59,6 +110,7 @@ public class SqlToOperationConverterTest {
" connector = 'kafka', \n" +
" kafka.topic = 'log.test'\n" +
")\n";
+ final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, node);
@@ -91,8 +143,29 @@ public class SqlToOperationConverterTest {
" connector = 'kafka', \n" +
" kafka.topic = 'log.test'\n" +
")\n";
+ final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = planner.parse(sql);
assert node instanceof SqlCreateTable;
SqlToOperationConverter.convert(planner, node);
}
+
+ @Test
+ public void testSqlInsertWithStaticPartition() {
+ final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
+ FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
+ SqlNode node = planner.parse(sql);
+ assert node instanceof RichSqlInsert;
+ Operation operation = SqlToOperationConverter.convert(planner, node);
+ assert operation instanceof CatalogSinkModifyOperation;
+ CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
+ final Map<String, String> expectedStaticPartitions = new HashMap<>();
+ expectedStaticPartitions.put("a", "1");
+ assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
+ }
+
+ private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+ tableConfig.setSqlDialect(sqlDialect);
+ return planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase());
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
index d021d3a..cb17bab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -27,19 +27,15 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
import org.apache.flink.table.api.scala.BatchTableEnvironment
-import org.apache.flink.table.api.{DataTypes, PlannerConfig, TableSchema, ValidationException}
-import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.api.{DataTypes, SqlDialect, TableSchema, ValidationException}
import org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase.{RESULT1, RESULT2, RESULT3, _}
import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, TableSink}
import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.types.Row
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
+
import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{Before, Rule, Test}
@@ -65,7 +61,7 @@ class PartitionableSinkITCase {
def before(): Unit = {
batchExec.setParallelism(3)
tEnv = BatchTableEnvironment.create(batchExec)
- tEnv.getConfig.setPlannerConfig(getPlannerConfig)
+ tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
registerTableSource("nonSortTable", testData.toList)
registerTableSource("sortTable", testData1.toList)
PartitionableSinkITCase.init()
@@ -80,17 +76,6 @@ class PartitionableSinkITCase {
tEnv.registerTableSource(name, new CollectionTableSource(data, 100, tableSchema))
}
- private def getPlannerConfig: PlannerConfig = {
- val parserConfig = SqlParser.configBuilder
- .setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
- .setLex(Lex.JAVA)
- .setIdentifierMaxLength(256).build
- CalciteConfig.createBuilder()
- .replaceSqlParserConfig(parserConfig)
- .build()
- }
-
@Test
def testInsertWithOutPartitionGrouping(): Unit = {
registerTableSink(grouping = false)