You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/02 10:32:45 UTC

[flink] branch release-1.9 updated: [FLINK-13338][table] Make SQL dialect configurable

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new fab290c  [FLINK-13338][table] Make SQL dialect configurable
fab290c is described below

commit fab290c4890ab8f18f00bd638989bc1cec9c5f24
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 24 12:27:19 2019 +0800

    [FLINK-13338][table] Make SQL dialect configurable
    
    This closes #9212.
---
 flink-python/pyflink/common/__init__.py            |   4 +-
 flink-python/pyflink/common/sql_dialect.py         |  69 +++++++++
 flink-python/pyflink/table/table_config.py         |  19 ++-
 .../org/apache/flink/table/api/SqlDialect.java     |  47 ++++++
 .../org/apache/flink/table/api/TableConfig.java    |  20 +++
 .../table/planner/delegation/PlannerContext.java   |  61 +++++---
 .../table/planner/delegation/PlannerBase.scala     |   6 +-
 .../table/sqlexec/SqlToOperationConverterTest.java | 170 +++++++++++++++++++++
 .../expressions/utils/ExpressionTestBase.scala     |   2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |   2 +-
 .../batch/sql/PartitionableSinkITCase.scala        |  23 +--
 .../planner/runtime/utils/BatchTestBase.scala      |  10 +-
 .../planner/PlanningConfigurationBuilder.java      |  16 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  91 +++++++++--
 .../batch/sql/PartitionableSinkITCase.scala        |  21 +--
 15 files changed, 478 insertions(+), 83 deletions(-)

diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index ca27df7..5cc9853 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -25,6 +25,7 @@ Important classes used by both Flink Streaming and Batch API:
 from pyflink.common.configuration import Configuration
 from pyflink.common.execution_config import ExecutionConfig
 from pyflink.common.execution_mode import ExecutionMode
+from pyflink.common.sql_dialect import SqlDialect
 from pyflink.common.input_dependency_constraint import InputDependencyConstraint
 from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
 
@@ -34,5 +35,6 @@ __all__ = [
     'ExecutionMode',
     'InputDependencyConstraint',
     'RestartStrategies',
-    'RestartStrategyConfiguration'
+    'RestartStrategyConfiguration',
+    'SqlDialect'
 ]
diff --git a/flink-python/pyflink/common/sql_dialect.py b/flink-python/pyflink/common/sql_dialect.py
new file mode 100644
index 0000000..b78e1e5
--- /dev/null
+++ b/flink-python/pyflink/common/sql_dialect.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['SqlDialect']
+
+
+class SqlDialect(object):
+    """
+    Enumeration of valid SQL compatibility modes.
+
+    In most of the cases, the built-in compatibility mode should be sufficient. For some features,
+    i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to switch to the
+    Hive dialect if required.
+
+    We may introduce other SQL dialects in the future.
+
+    :data:`DEFAULT`:
+
+    Flink's default SQL behavior.
+
+    :data:`HIVE`:
+
+    SQL dialect that allows some Apache Hive specific grammar.
+
+    Note: We might never support all of the Hive grammar. See the documentation for
+    supported features.
+    """
+
+    DEFAULT = 0
+    HIVE = 1
+
+    @staticmethod
+    def _from_j_sql_dialect(j_sql_dialect):
+        gateway = get_gateway()
+        JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+        if j_sql_dialect == JSqlDialect.DEFAULT:
+            return SqlDialect.DEFAULT
+        elif j_sql_dialect == JSqlDialect.HIVE:
+            return SqlDialect.HIVE
+        else:
+            raise Exception("Unsupported Java SQL dialect: %s" % j_sql_dialect)
+
+    @staticmethod
+    def _to_j_sql_dialect(sql_dialect):
+        gateway = get_gateway()
+        JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+        if sql_dialect == SqlDialect.DEFAULT:
+            return JSqlDialect.DEFAULT
+        elif sql_dialect == SqlDialect.HIVE:
+            return JSqlDialect.HIVE
+        else:
+            raise TypeError("Unsupported SQL dialect: %s, supported SQL dialects are: "
+                            "SqlDialect.DEFAULT, SqlDialect.HIVE." % sql_dialect)
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 3a84fca..42026d2 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -19,7 +19,7 @@ import sys
 
 from py4j.compat import long
 
-from pyflink.common import Configuration
+from pyflink.common import Configuration, SqlDialect
 from pyflink.java_gateway import get_gateway
 
 __all__ = ['TableConfig']
@@ -247,6 +247,23 @@ class TableConfig(object):
         """
         self._j_table_config.addConfiguration(configuration._j_configuration)
 
+    def get_sql_dialect(self):
+        """
+        Returns the current SQL dialect.
+
+        :rtype: SqlDialect
+        """
+        return SqlDialect._from_j_sql_dialect(self._j_table_config.getSqlDialect())
+
+    def set_sql_dialect(self, sql_dialect):
+        """
+        Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default.
+
+        :param sql_dialect: The given SQL dialect.
+        :type sql_dialect: SqlDialect
+        """
+        self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))
+
     @staticmethod
     def get_default():
         """
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
new file mode 100644
index 0000000..c6e5eb2
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Enumeration of valid SQL compatibility modes.
+ *
+ * <p>In most of the cases, the built-in compatibility mode should be sufficient. For some features,
+ * i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to switch to the Hive dialect
+ * if required.
+ *
+ * <p>We may introduce other SQL dialects in the future.
+ */
+@PublicEvolving
+public enum SqlDialect {
+
+	/**
+	 * Flink's default SQL behavior.
+	 */
+	DEFAULT,
+
+	/**
+	 * SQL dialect that allows some Apache Hive specific grammar.
+	 *
+	 * <p>Note: We might never support all of the Hive grammar. See the documentation for supported
+	 * features.
+	 */
+	HIVE
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index be7a3f6..7dfd46f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -77,6 +77,12 @@ public class TableConfig {
 	private Configuration configuration = new Configuration();
 
 	/**
+	 * The SQL dialect defines how to parse a SQL query. A different SQL dialect may support different
+	 * SQL grammar.
+	 */
+	private SqlDialect sqlDialect = SqlDialect.DEFAULT;
+
+	/**
 	 * Returns all key/value configuration.
 	 */
 	public Configuration getConfiguration() {
@@ -94,6 +100,20 @@ public class TableConfig {
 	}
 
 	/**
+	 * Returns the current SQL dialect.
+	 */
+	public SqlDialect getSqlDialect() {
+		return this.sqlDialect;
+	}
+
+	/**
+	 * Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default.
+	 */
+	public void setSqlDialect(SqlDialect sqlDialect) {
+		this.sqlDialect = sqlDialect;
+	}
+
+	/**
 	 * Returns the zone id for timestamp with local time zone.
 	 */
 	public ZoneId getLocalTimeZone() {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index df085ed..df093e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner.delegation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.planner.calcite.CalciteConfig;
 import org.apache.flink.table.planner.calcite.CalciteConfig$;
@@ -41,6 +43,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils;
 
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitDef;
@@ -72,8 +75,10 @@ public class PlannerContext {
 	private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem);
 	private final TableConfig tableConfig;
 	private final FunctionCatalog functionCatalog;
-	private final FrameworkConfig frameworkConfig;
 	private final RelOptCluster cluster;
+	private final Context context;
+	private final CalciteSchema rootSchema;
+	private final List<RelTraitDef> traitDefs;
 
 	public PlannerContext(
 			TableConfig tableConfig,
@@ -82,7 +87,13 @@ public class PlannerContext {
 			List<RelTraitDef> traitDefs) {
 		this.tableConfig = tableConfig;
 		this.functionCatalog = functionCatalog;
-		this.frameworkConfig = createFrameworkConfig(rootSchema, traitDefs);
+		this.context = new FlinkContextImpl(tableConfig, functionCatalog);
+		this.rootSchema = rootSchema;
+		this.traitDefs = traitDefs;
+		// Make a framework config to initialize the RelOptCluster instance,
+		// caution that we can only use the attributes that can not be overwrite/configured
+		// by user.
+		final FrameworkConfig frameworkConfig = createFrameworkConfig();
 
 		RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
 		planner.setExecutor(frameworkConfig.getExecutor());
@@ -92,19 +103,19 @@ public class PlannerContext {
 		this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
 	}
 
-	private FrameworkConfig createFrameworkConfig(CalciteSchema rootSchema, List<RelTraitDef> traitDefs) {
+	private FrameworkConfig createFrameworkConfig() {
 		return Frameworks.newConfigBuilder()
-				.defaultSchema(rootSchema.plus())
-				.parserConfig(getSqlParserConfig())
-				.costFactory(new FlinkCostFactory())
-				.typeSystem(typeSystem)
-				.sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
-				.operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), functionCatalog))
-				// set the executor to evaluate constant expressions
-				.executor(new ExpressionReducer(tableConfig, false))
-				.context(new FlinkContextImpl(tableConfig, functionCatalog))
-				.traitDefs(traitDefs)
-				.build();
+			.defaultSchema(rootSchema.plus())
+			.parserConfig(getSqlParserConfig())
+			.costFactory(new FlinkCostFactory())
+			.typeSystem(typeSystem)
+			.sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
+			.operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), functionCatalog))
+			// set the executor to evaluate constant expressions
+			.executor(new ExpressionReducer(tableConfig, false))
+			.context(context)
+			.traitDefs(traitDefs)
+			.build();
 	}
 
 	/** Returns the {@link FlinkTypeFactory} that will be used. */
@@ -121,7 +132,7 @@ public class PlannerContext {
 	 */
 	public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) {
 		FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase);
-		return new FlinkRelBuilder(frameworkConfig.getContext(), cluster, relOptSchema);
+		return new FlinkRelBuilder(this.context, cluster, relOptSchema);
 	}
 
 	/**
@@ -133,7 +144,7 @@ public class PlannerContext {
 	 */
 	public FlinkPlannerImpl createFlinkPlanner(String currentCatalog, String currentDatabase) {
 		return new FlinkPlannerImpl(
-				frameworkConfig,
+				createFrameworkConfig(),
 				isLenient -> createCatalogReader(isLenient, currentCatalog, currentDatabase),
 				typeFactory,
 				cluster);
@@ -143,7 +154,7 @@ public class PlannerContext {
 			boolean lenientCaseSensitivity,
 			String currentCatalog,
 			String currentDatabase) {
-		SqlParser.Config sqlParserConfig = frameworkConfig.getParserConfig();
+		SqlParser.Config sqlParserConfig = getSqlParserConfig();
 		final boolean caseSensitive;
 		if (lenientCaseSensitivity) {
 			caseSensitive = false;
@@ -155,7 +166,7 @@ public class PlannerContext {
 				.setCaseSensitive(caseSensitive)
 				.build();
 
-		SchemaPlus rootSchema = getRootSchema(frameworkConfig.getDefaultSchema());
+		SchemaPlus rootSchema = getRootSchema(this.rootSchema.plus());
 		return new FlinkCalciteCatalogReader(
 				CalciteSchema.from(rootSchema),
 				asList(
@@ -188,12 +199,24 @@ public class PlannerContext {
 				() -> SqlParser
 						.configBuilder()
 						.setParserFactory(FlinkSqlParserImpl.FACTORY)
-						.setConformance(FlinkSqlConformance.DEFAULT)
+						.setConformance(getSqlConformance())
 						.setLex(Lex.JAVA)
 						.setIdentifierMaxLength(256)
 						.build());
 	}
 
+	private FlinkSqlConformance getSqlConformance() {
+		SqlDialect sqlDialect = tableConfig.getSqlDialect();
+		switch (sqlDialect) {
+			case HIVE:
+				return FlinkSqlConformance.HIVE;
+			case DEFAULT:
+				return FlinkSqlConformance.DEFAULT;
+			default:
+				throw new TableException("Unsupported SQL dialect: " + sqlDialect);
+		}
+	}
+
 	/**
 	 * Returns the {@link SqlToRelConverter} config.
 	 *
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 18922e8..9168d44 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -100,7 +100,7 @@ abstract class PlannerBase(
 
   /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
   @VisibleForTesting
-  private[flink] def getFlinkPlanner: FlinkPlannerImpl = {
+  private[flink] def createFlinkPlanner: FlinkPlannerImpl = {
     val currentCatalogName = catalogManager.getCurrentCatalog
     val currentDatabase = catalogManager.getCurrentDatabase
     plannerContext.createFlinkPlanner(currentCatalogName, currentDatabase)
@@ -122,7 +122,7 @@ abstract class PlannerBase(
   }
 
   override def parse(stmt: String): util.List[Operation] = {
-    val planner = getFlinkPlanner
+    val planner = createFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(stmt)
     parsed match {
@@ -159,7 +159,7 @@ abstract class PlannerBase(
   }
 
   override def getCompletionHints(statement: String, position: Int): Array[String] = {
-    val planner = getFlinkPlanner
+    val planner = createFlinkPlanner
     planner.getCompletionHints(statement, position)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
new file mode 100644
index 0000000..9368d00
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.operations.SqlConversionException;
+import org.apache.flink.table.planner.operations.SqlToOperationConverter;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for {@link org.apache.flink.table.planner.operations.SqlToOperationConverter}.
+ */
+public class SqlToOperationConverterTest {
+	private final TableConfig tableConfig = new TableConfig();
+	private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
+		"default");
+	private final CatalogManager catalogManager =
+		new CatalogManager("builtin", catalog);
+	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+	private final PlannerContext plannerContext =
+		new PlannerContext(tableConfig,
+			functionCatalog,
+			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
+			new ArrayList<>());
+
+	@Before
+	public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+		final TableSchema tableSchema = TableSchema.builder()
+			.field("a", DataTypes.BIGINT())
+			.field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+			.field("c", DataTypes.INT())
+			.field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+			.build();
+		Map<String, String> properties = new HashMap<>();
+		properties.put("connector", "COLLECTION");
+		final CatalogTable catalogTable =  new CatalogTableImpl(tableSchema, properties, "");
+		catalog.createTable(path1, catalogTable, true);
+		catalog.createTable(path2, catalogTable, true);
+	}
+
+	@After
+	public void after() throws TableNotExistException {
+		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+		catalog.dropTable(path1, true);
+		catalog.dropTable(path2, true);
+	}
+
+	@Test
+	public void testCreateTable() {
+		final String sql = "CREATE TABLE tbl1 (\n" +
+			"  a bigint,\n" +
+			"  b varchar, \n" +
+			"  c int, \n" +
+			"  d varchar" +
+			")\n" +
+			"  PARTITIONED BY (a, d)\n" +
+			"  with (\n" +
+			"    connector = 'kafka', \n" +
+			"    kafka.topic = 'log.test'\n" +
+			")\n";
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		Operation operation = parse(sql, planner);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
+		assertArrayEquals(catalogTable.getSchema().getFieldNames(),
+			new String[] {"a", "b", "c", "d"});
+		assertArrayEquals(catalogTable.getSchema().getFieldDataTypes(),
+			new DataType[]{
+				DataTypes.BIGINT(),
+				DataTypes.VARCHAR(Integer.MAX_VALUE),
+				DataTypes.INT(),
+				DataTypes.VARCHAR(Integer.MAX_VALUE)});
+	}
+
+	@Test(expected = SqlConversionException.class)
+	public void testCreateTableWithPkUniqueKeys() {
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		final String sql = "CREATE TABLE tbl1 (\n" +
+			"  a bigint,\n" +
+			"  b varchar, \n" +
+			"  c int, \n" +
+			"  d varchar, \n" +
+			"  primary key(a), \n" +
+			"  unique(a, b) \n" +
+			")\n" +
+			"  PARTITIONED BY (a, d)\n" +
+			"  with (\n" +
+			"    connector = 'kafka', \n" +
+			"    kafka.topic = 'log.test'\n" +
+			")\n";
+		parse(sql, planner);
+	}
+
+	@Test
+	public void testSqlInsertWithStaticPartition() {
+		final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
+		Operation operation = parse(sql, planner);
+		assert operation instanceof CatalogSinkModifyOperation;
+		CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
+		final Map<String, String> expectedStaticPartitions = new HashMap<>();
+		expectedStaticPartitions.put("a", "1");
+		assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
+	}
+
+	private Operation parse(String sql, FlinkPlannerImpl planner) {
+		SqlNode node = planner.parse(sql);
+		return SqlToOperationConverter.convert(planner, node);
+	}
+
+	private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+		tableConfig.setSqlDialect(sqlDialect);
+		return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+			catalogManager.getCurrentDatabase());
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index eb0d928..9b7bd4a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -64,7 +64,7 @@ abstract class ExpressionTestBase {
   private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   private val relBuilder = planner.getRelBuilder
-  private val calcitePlanner = planner.getFlinkPlanner
+  private val calcitePlanner = planner.createFlinkPlanner
 
   // setup test utils
   private val tableName = "testTable"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 254e60f..bd5f47b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -54,7 +54,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
   private val testTableRowType = RowType.of(new IntType)
   private val tableName = "testTable"
   private val context = prepareContext(testTableTypeInfo)
-  private val calcitePlanner: FlinkPlannerImpl = context._2.getFlinkPlanner
+  private val calcitePlanner: FlinkPlannerImpl = context._2.createFlinkPlanner
 
   private def prepareContext(typeInfo: TypeInformation[Row])
   : (RelBuilder, PlannerBase, StreamExecutionEnvironment) = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 60dc55b..80b2b4e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -22,13 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableSchema, ValidationException}
-import org.apache.flink.table.planner.calcite.CalciteConfig
+import org.apache.flink.table.api.{SqlDialect, TableSchema, ValidationException}
 import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -37,8 +34,6 @@ import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, Ta
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
 
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
 import org.junit.Assert._
 import org.junit.rules.ExpectedException
 import org.junit.{Before, Rule, Test}
@@ -66,24 +61,12 @@ class PartitionableSinkITCase extends BatchTestBase {
     tEnv.getConfig
       .getConfiguration
       .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
+    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
     registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
     PartitionableSinkITCase.init()
   }
 
-  override def getTableConfig: TableConfig = {
-    val parserConfig = SqlParser.configBuilder
-      .setParserFactory(FlinkSqlParserImpl.FACTORY)
-      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
-      .setLex(Lex.JAVA)
-      .setIdentifierMaxLength(256).build
-    val plannerConfig = CalciteConfig.createBuilder(CalciteConfig.DEFAULT)
-      .replaceSqlParserConfig(parserConfig)
-    val tableConfig = new TableConfig
-    tableConfig.setPlannerConfig(plannerConfig.build())
-    tableConfig
-  }
-
   @Test
   def testInsertWithOutPartitionGrouping(): Unit = {
     registerTableSink()
@@ -181,7 +164,7 @@ class PartitionableSinkITCase extends BatchTestBase {
     expectedEx.expect(classOf[ValidationException])
     expectedEx.expectMessage("Static partition column b "
       + "should appear before dynamic partition a")
-    registerTableSink(grouping = true, partitionColumns = Array("a", "b"))
+    registerTableSink(partitionColumns = Array("a", "b"))
     tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
     tEnv.execute("testJob")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 844eada..76c2171 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -56,7 +56,7 @@ class BatchTestBase extends BatchAbstractTestBase {
 
   private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
   private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
-    .create(settings, catalogManager = None, getTableConfig)
+    .create(settings, catalogManager = None, new TableConfig)
   val tEnv: TableEnvironment = testingTableEnv
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val env: StreamExecutionEnvironment = planner.getExecEnv
@@ -67,14 +67,6 @@ class BatchTestBase extends BatchAbstractTestBase {
   val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
     + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
 
-  // TODO: [FLINK-13338] will expose dialect option to TableConfig to
-  //  avoid override CalciteConfig by users
-  /**
-    * Subclass should overwrite this method if we want to overwrite configuration during
-    * sql parse to sql to rel conversion phrase.
-    */
-  protected def getTableConfig: TableConfig = new TableConfig
-
   @Before
   def before(): Unit = {
     conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 91e0b64..1c137d1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.calcite.FlinkRelBuilder;
@@ -155,11 +157,23 @@ public class PlanningConfigurationBuilder {
 			SqlParser
 				.configBuilder()
 				.setParserFactory(FlinkSqlParserImpl.FACTORY)
-				.setConformance(FlinkSqlConformance.DEFAULT)
+				.setConformance(getSqlConformance())
 				.setLex(Lex.JAVA)
 				.build());
 	}
 
+	private FlinkSqlConformance getSqlConformance() {
+		SqlDialect sqlDialect = tableConfig.getSqlDialect();
+		switch (sqlDialect) {
+			case HIVE:
+				return FlinkSqlConformance.HIVE;
+			case DEFAULT:
+				return FlinkSqlConformance.DEFAULT;
+			default:
+				throw new TableException("Unsupported SQL dialect: " + sqlDialect);
+		}
+	}
+
 	private CatalogReader createCatalogReader(
 			boolean lenientCaseSensitivity,
 			String currentCatalog,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cc78be7..fe3a405 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -18,33 +18,84 @@
 
 package org.apache.flink.table.sqlexec;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogManagerCalciteSchema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpressionConverter;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.PlanningConfigurationBuilder;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-/** Test cases for SqlExecutableStatement. **/
+/** Test cases for {@link SqlToOperationConverter}. **/
 public class SqlToOperationConverterTest {
-	private static final ExecutionEnvironment streamExec =
-		ExecutionEnvironment.getExecutionEnvironment();
-	private static final BatchTableEnvImpl batchEnv =
-		(BatchTableEnvImpl) BatchTableEnvironment.create(streamExec);
+	private final TableConfig tableConfig = new TableConfig();
+	private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
+		"default");
+	private final CatalogManager catalogManager =
+		new CatalogManager("builtin", catalog);
+	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+	private final PlanningConfigurationBuilder planningConfigurationBuilder =
+		new PlanningConfigurationBuilder(tableConfig,
+			functionCatalog,
+			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
+			new ExpressionBridge<>(functionCatalog,
+				PlannerExpressionConverter.INSTANCE()));
 
-	private static final FlinkPlannerImpl planner = batchEnv.getFlinkPlanner();
+	@Before
+	public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+		final TableSchema tableSchema = TableSchema.builder()
+			.field("a", DataTypes.BIGINT())
+			.field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+			.field("c", DataTypes.INT())
+			.field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+			.build();
+		Map<String, String> properties = new HashMap<>();
+		properties.put("connector", "COLLECTION");
+		final CatalogTable catalogTable =  new CatalogTableImpl(tableSchema, properties, "");
+		catalog.createTable(path1, catalogTable, true);
+		catalog.createTable(path2, catalogTable, true);
+	}
+
+	@After
+	public void after() throws TableNotExistException {
+		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+		catalog.dropTable(path1, true);
+		catalog.dropTable(path2, true);
+	}
 
 	@Test
 	public void testCreateTable() {
@@ -59,6 +110,7 @@ public class SqlToOperationConverterTest {
 			"    connector = 'kafka', \n" +
 			"    kafka.topic = 'log.test'\n" +
 			")\n";
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
 		SqlNode node = planner.parse(sql);
 		assert node instanceof SqlCreateTable;
 		Operation operation = SqlToOperationConverter.convert(planner, node);
@@ -91,8 +143,29 @@ public class SqlToOperationConverterTest {
 			"    connector = 'kafka', \n" +
 			"    kafka.topic = 'log.test'\n" +
 			")\n";
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
 		SqlNode node = planner.parse(sql);
 		assert node instanceof SqlCreateTable;
 		SqlToOperationConverter.convert(planner, node);
 	}
+
+	@Test
+	public void testSqlInsertWithStaticPartition() {
+		final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
+		SqlNode node = planner.parse(sql);
+		assert node instanceof RichSqlInsert;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		assert operation instanceof CatalogSinkModifyOperation;
+		CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
+		final Map<String, String> expectedStaticPartitions = new HashMap<>();
+		expectedStaticPartitions.put("a", "1");
+		assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
+	}
+
+	private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+		tableConfig.setSqlDialect(sqlDialect);
+		return planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+			catalogManager.getCurrentDatabase());
+	}
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
index d021d3a..cb17bab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -27,19 +27,15 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.table.api.scala.BatchTableEnvironment
-import org.apache.flink.table.api.{DataTypes, PlannerConfig, TableSchema, ValidationException}
-import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.api.{DataTypes, SqlDialect, TableSchema, ValidationException}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
 import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase.{RESULT1, RESULT2, RESULT3, _}
 import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, TableSink}
 import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
+
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{Before, Rule, Test}
@@ -65,7 +61,7 @@ class PartitionableSinkITCase {
   def before(): Unit = {
     batchExec.setParallelism(3)
     tEnv = BatchTableEnvironment.create(batchExec)
-    tEnv.getConfig.setPlannerConfig(getPlannerConfig)
+    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
     registerTableSource("nonSortTable", testData.toList)
     registerTableSource("sortTable", testData1.toList)
     PartitionableSinkITCase.init()
@@ -80,17 +76,6 @@ class PartitionableSinkITCase {
     tEnv.registerTableSource(name, new CollectionTableSource(data, 100, tableSchema))
   }
 
-  private def getPlannerConfig: PlannerConfig = {
-    val parserConfig = SqlParser.configBuilder
-      .setParserFactory(FlinkSqlParserImpl.FACTORY)
-      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
-      .setLex(Lex.JAVA)
-      .setIdentifierMaxLength(256).build
-    CalciteConfig.createBuilder()
-      .replaceSqlParserConfig(parserConfig)
-      .build()
-  }
-
   @Test
   def testInsertWithOutPartitionGrouping(): Unit = {
     registerTableSink(grouping = false)