You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/24 05:13:39 UTC

[flink] 01/03: [FLINK-13266][table] Move OptimizerConfigOptions & ExecutionConfigOptions to table-api-java module

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66e55d60f819c5a1f809830190461fb6ac341b0b
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jul 17 19:29:02 2019 +0800

    [FLINK-13266][table] Move OptimizerConfigOptions & ExecutionConfigOptions to table-api-java module
---
 .../flink/table/api/config}/ExecutionConfigOptions.java       | 11 ++++++-----
 .../flink/table/api/config}/OptimizerConfigOptions.java       |  4 +++-
 .../java/org/apache/flink/table/executor/BatchExecutor.java   |  2 +-
 .../flink/table/plan/nodes/resource/NodeResourceUtil.java     |  2 +-
 .../java/org/apache/flink/table/plan/util/OperatorType.java   |  2 +-
 .../org/apache/flink/table/util/AggregatePhaseStrategy.java   |  2 +-
 .../java/org/apache/flink/table/util/TableConfigUtils.java    |  4 ++--
 .../flink/table/codegen/NestedLoopJoinCodeGenerator.scala     |  2 +-
 .../flink/table/plan/nodes/common/CommonLookupJoin.scala      |  3 ++-
 .../flink/table/plan/nodes/logical/FlinkLogicalSort.scala     |  2 +-
 .../table/plan/nodes/physical/batch/BatchExecExchange.scala   |  2 +-
 .../nodes/physical/batch/BatchExecHashAggregateBase.scala     |  2 +-
 .../table/plan/nodes/physical/batch/BatchExecHashJoin.scala   |  2 +-
 .../physical/batch/BatchExecHashWindowAggregateBase.scala     |  2 +-
 .../plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala   |  2 +-
 .../plan/nodes/physical/batch/BatchExecOverAggregate.scala    |  3 ++-
 .../flink/table/plan/nodes/physical/batch/BatchExecSort.scala |  2 +-
 .../plan/nodes/physical/batch/BatchExecSortMergeJoin.scala    |  2 +-
 .../physical/batch/BatchExecSortWindowAggregateBase.scala     |  2 +-
 .../plan/nodes/physical/stream/StreamExecDeduplicate.scala    |  3 ++-
 .../physical/stream/StreamExecGlobalGroupAggregate.scala      |  3 ++-
 .../plan/nodes/physical/stream/StreamExecGroupAggregate.scala |  2 +-
 .../nodes/physical/stream/StreamExecWatermarkAssigner.scala   |  3 ++-
 .../plan/optimize/StreamCommonSubGraphBasedOptimizer.scala    |  3 ++-
 .../flink/table/plan/optimize/program/FlinkBatchProgram.scala |  2 +-
 .../table/plan/optimize/program/FlinkStreamProgram.scala      |  2 +-
 .../org/apache/flink/table/plan/reuse/SubplanReuser.scala     |  3 ++-
 .../rules/logical/PushFilterIntoTableSourceScanRule.scala     |  2 +-
 .../flink/table/plan/rules/logical/SplitAggregateRule.scala   |  3 ++-
 .../plan/rules/physical/batch/BatchExecHashAggRule.scala      |  2 +-
 .../plan/rules/physical/batch/BatchExecHashJoinRule.scala     |  3 ++-
 .../plan/rules/physical/batch/BatchExecSortAggRule.scala      |  2 +-
 .../rules/physical/batch/BatchExecWindowAggregateRule.scala   |  3 ++-
 .../rules/physical/stream/MiniBatchIntervalInferRule.scala    |  2 +-
 .../physical/stream/TwoStageOptimizedAggregateRule.scala      |  2 +-
 .../org/apache/flink/table/plan/util/AggregateUtil.scala      |  5 ++++-
 .../parallelism/ShuffleStageParallelismCalculatorTest.java    |  2 +-
 .../scala/org/apache/flink/table/api/batch/ExplainTest.scala  |  2 +-
 .../scala/org/apache/flink/table/api/stream/ExplainTest.scala |  2 +-
 .../org/apache/flink/table/catalog/CatalogTableITCase.scala   |  3 ++-
 .../flink/table/plan/batch/sql/DeadlockBreakupTest.scala      |  2 +-
 .../flink/table/plan/batch/sql/RemoveCollationTest.scala      |  3 ++-
 .../apache/flink/table/plan/batch/sql/RemoveShuffleTest.scala |  3 ++-
 .../apache/flink/table/plan/batch/sql/SetOperatorsTest.scala  |  3 ++-
 .../org/apache/flink/table/plan/batch/sql/SortLimitTest.scala |  2 +-
 .../org/apache/flink/table/plan/batch/sql/SortTest.scala      |  2 +-
 .../apache/flink/table/plan/batch/sql/SubplanReuseTest.scala  |  2 +-
 .../flink/table/plan/batch/sql/agg/HashAggregateTest.scala    |  3 ++-
 .../flink/table/plan/batch/sql/agg/SortAggregateTest.scala    |  2 +-
 .../flink/table/plan/batch/sql/agg/WindowAggregateTest.scala  |  3 ++-
 .../table/plan/batch/sql/join/BroadcastHashJoinTest.scala     |  3 ++-
 .../plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala   |  3 ++-
 .../flink/table/plan/batch/sql/join/LookupJoinTest.scala      |  1 +
 .../flink/table/plan/batch/sql/join/NestedLoopJoinTest.scala  |  2 +-
 .../plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala      |  2 +-
 .../table/plan/batch/sql/join/ShuffledHashJoinTest.scala      |  3 ++-
 .../plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala    |  3 ++-
 .../flink/table/plan/batch/sql/join/SortMergeJoinTest.scala   |  3 ++-
 .../table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala |  3 ++-
 .../apache/flink/table/plan/common/JoinReorderTestBase.scala  |  3 ++-
 .../table/plan/nodes/resource/ExecNodeResourceTest.scala      |  3 ++-
 .../table/plan/rules/logical/SplitAggregateRuleTest.scala     |  2 +-
 .../physical/batch/RemoveRedundantLocalHashAggRuleTest.scala  |  2 +-
 .../physical/batch/RemoveRedundantLocalSortAggRuleTest.scala  |  2 +-
 .../physical/stream/RetractionRulesWithTwoStageAggTest.scala  |  2 +-
 .../table/plan/stream/sql/MiniBatchIntervalInferTest.scala    |  3 ++-
 .../table/plan/stream/sql/ModifiedMonotonicityTest.scala      |  2 +-
 .../apache/flink/table/plan/stream/sql/SubplanReuseTest.scala |  2 +-
 .../flink/table/plan/stream/sql/agg/AggregateTest.scala       |  3 ++-
 .../table/plan/stream/sql/agg/DistinctAggregateTest.scala     |  2 +-
 .../table/plan/stream/sql/agg/TwoStageAggregateTest.scala     |  2 +-
 .../flink/table/plan/stream/table/TwoStageAggregateTest.scala |  3 ++-
 .../org/apache/flink/table/runtime/batch/sql/CalcITCase.scala |  3 ++-
 .../apache/flink/table/runtime/batch/sql/DecimalITCase.scala  |  3 ++-
 .../org/apache/flink/table/runtime/batch/sql/MiscITCase.scala |  2 +-
 .../flink/table/runtime/batch/sql/OverWindowITCase.scala      |  2 +-
 .../table/runtime/batch/sql/PartitionableSinkITCase.scala     |  4 +++-
 .../org/apache/flink/table/runtime/batch/sql/RankITCase.scala |  1 -
 .../flink/table/runtime/batch/sql/SortLimitITCase.scala       |  2 +-
 .../apache/flink/table/runtime/batch/sql/UnionITCase.scala    |  2 +-
 .../runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala  |  2 +-
 .../runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala |  5 +++--
 .../table/runtime/batch/sql/agg/GroupingSetsITCase.scala      |  2 +-
 .../flink/table/runtime/batch/sql/agg/HashAggITCase.scala     |  3 +--
 .../runtime/batch/sql/agg/HashDistinctAggregateITCase.scala   |  2 +-
 .../flink/table/runtime/batch/sql/agg/SortAggITCase.scala     |  2 +-
 .../runtime/batch/sql/agg/SortDistinctAggregateITCase.scala   |  2 +-
 .../flink/table/runtime/batch/sql/join/InnerJoinITCase.scala  |  2 +-
 .../flink/table/runtime/batch/sql/join/JoinITCase.scala       |  3 ++-
 .../flink/table/runtime/batch/sql/join/JoinITCaseHelper.scala |  3 ++-
 .../flink/table/runtime/batch/table/DecimalITCase.scala       |  3 ++-
 .../flink/table/runtime/stream/sql/SplitAggregateITCase.scala |  2 +-
 .../org/apache/flink/table/runtime/utils/BatchTestBase.scala  |  3 ++-
 .../flink/table/runtime/utils/StreamingWithAggTestBase.scala  |  2 +-
 .../table/runtime/utils/StreamingWithMiniBatchTestBase.scala  |  2 +-
 .../scala/org/apache/flink/table/util/TableTestBase.scala     |  1 +
 flink-table/flink-table-runtime-blink/pom.xml                 |  6 ++++++
 .../flink/table/runtime/hashtable/BaseHybridHashTable.java    |  2 +-
 .../apache/flink/table/runtime/sort/BinaryExternalSorter.java |  2 +-
 .../flink/table/runtime/sort/BufferedKVExternalSorter.java    |  2 +-
 .../flink/table/runtime/hashtable/BinaryHashTableTest.java    |  2 +-
 .../flink/table/runtime/hashtable/LongHashTableTest.java      |  2 +-
 .../flink/table/runtime/sort/BinaryExternalSorterTest.java    |  2 +-
 .../table/runtime/sort/BufferedKVExternalSorterTest.java      |  2 +-
 104 files changed, 156 insertions(+), 108 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
similarity index 97%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 283625c..7053f3f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -16,15 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api;
+package org.apache.flink.table.api.config;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.table.runtime.window.grouping.HeapWindowsGrouping;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class holds configuration constants used by Flink's table module.
+ *
+ * <p>This is only used for the Blink planner.
  */
 public class ExecutionConfigOptions {
 
@@ -48,7 +49,7 @@ public class ExecutionConfigOptions {
 					.withDescription("Default limit when user don't set a limit after order by. ");
 
 	public static final ConfigOption<Integer> SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM =
-			key("sql.exec.sort.file-handles.num.max")
+			key("sql.exec.sort.max-num-file-handles")
 					.defaultValue(128)
 					.withDescription("The maximal fan-in for external merge sort. It limits the number of file handles per operator. " +
 							"If it is too small, may cause intermediate merging. But if it is too large, " +
@@ -125,10 +126,10 @@ public class ExecutionConfigOptions {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * See {@link HeapWindowsGrouping}.
+	 * See {@code org.apache.flink.table.runtime.window.grouping.HeapWindowsGrouping}.
 	 */
 	public static final ConfigOption<Integer> SQL_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT =
-			key("sql.exec.window-agg.buffer-size.limit")
+			key("sql.exec.window-agg.buffer-size-limit")
 					.defaultValue(100 * 1000)
 					.withDescription("Sets the window elements buffer size limit used in group window agg operator.");
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
similarity index 98%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/OptimizerConfigOptions.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index a4674a7..8be671a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api;
+package org.apache.flink.table.api.config;
 
 import org.apache.flink.configuration.ConfigOption;
 
@@ -24,6 +24,8 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class holds configuration constants used by Flink's table planner module.
+ *
+ * <p>This is only used for the Blink planner.
  */
 public class OptimizerConfigOptions {
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index 0cf5169..7d6e450 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.plan.nodes.resource.NodeResourceUtil;
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
index 1f3fc35..acaf177 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResourceUtil.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.resource;
 
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 /**
  * Deal with resource config for {@link org.apache.flink.table.plan.nodes.exec.ExecNode}.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/util/OperatorType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/util/OperatorType.java
index f267981..5c972b8 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/util/OperatorType.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/util/OperatorType.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.util;
 
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 /**
  * Some dedicated operator type which is used in
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/AggregatePhaseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/AggregatePhaseStrategy.java
index 315a7f3..b46ab56 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/AggregatePhaseStrategy.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/AggregatePhaseStrategy.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.util;
 
-import org.apache.flink.table.api.OptimizerConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 
 /**
  * Aggregate phase strategy which could be specified in {@link OptimizerConfigOptions#SQL_OPTIMIZER_AGG_PHASE_STRATEGY}.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/TableConfigUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/TableConfigUtils.java
index 05abfe3..690eafb 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/TableConfigUtils.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/TableConfigUtils.java
@@ -29,8 +29,8 @@ import java.util.Set;
 
 import scala.concurrent.duration.Duration;
 
-import static org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS;
-import static org.apache.flink.table.api.OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS;
+import static org.apache.flink.table.api.config.OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY;
 
 /**
  * Utility class for {@link TableConfig} related helper functions.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
index c2000f7..aa42bee 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/NestedLoopJoinCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.codegen
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, BINARY_ROW, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, className, newName}
 import org.apache.flink.table.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
 import org.apache.flink.table.dataformat.{BaseRow, JoinedRow}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
index 45a6266..bde6b9c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
@@ -25,7 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.operators.ProcessOperator
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableException, TableSchema}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableException, TableSchema}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.LookupJoinCodeGenerator._
 import org.apache.flink.table.codegen.{CodeGeneratorContext, LookupJoinCodeGenerator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
index 7befb58..fce4ee8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
-import org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT
+import org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
index f2170cb..75c471e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.{PartitionTransformation, ShuffleMode}
 import org.apache.flink.streaming.runtime.partitioner.{BroadcastPartitioner, GlobalPartitioner, RebalancePartitioner}
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGeneratorContext, HashCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index e395f63..0fa005e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator, HashAggCodeGenerator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
index c250c20..d6af5c6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.ProjectionCodeGenerator.generateProjection
 import org.apache.flink.table.codegen.{CodeGeneratorContext, LongHashJoinGenerator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index e9d49ad..b076e46 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index 229bebc..62132d2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGeneratorContext, NestedLoopJoinCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index 16c0c4d..0331186 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.CalcitePair
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
index 7662afc..2babfb8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.sort.SortCodeGenerator
 import org.apache.flink.table.dataformat.BaseRow
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index bb04dd4..1808ba6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.ProjectionCodeGenerator.generateProjection
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index ab8be01..c0157c9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
index 581a9a4..01cacf9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
@@ -21,7 +21,8 @@ package org.apache.flink.table.plan.nodes.physical.stream
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index a9ee7e8..1a9df7a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.plan.nodes.physical.stream
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableException}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
 import org.apache.flink.table.codegen.{CodeGeneratorContext, EqualiserCodeGenerator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index a9cd9f3..30afbe3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.stream
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
 import org.apache.flink.table.codegen.{CodeGeneratorContext, EqualiserCodeGenerator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
index 84359d1..dae7285 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.plan.nodes.physical.stream
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.`trait`.{MiniBatchIntervalTraitDef, MiniBatchMode}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index c0196e9..20d4762 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.optimize
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.catalog.FunctionCatalog
 import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef, MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, UpdateAsRetractionTraitDef}
 import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index edc15e3..d26cb40 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.optimize.program
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index 7fac4cd..6c57bdb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.optimize.program
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/SubplanReuser.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/SubplanReuser.scala
index 548efa0..bb0c726 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/SubplanReuser.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/SubplanReuser.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.reuse
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, TableConfig, TableException}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan
 import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 14f173d..e73e8ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.rules.logical
 
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.expressions.{Expression, RexNodeConverter}
 import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRule.scala
index 3e86e44..5a31709 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRule.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.rules.logical
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.{FlinkContext, FlinkLogicalRelFactories, FlinkRelBuilder}
 import org.apache.flink.table.functions.sql.{FlinkSqlOperatorTable, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.plan.PartialFinalType
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashAggRule.scala
index dd2ddfd..9a24a16 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashAggRule.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.plan.rules.physical.batch
 
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.FlinkConventions
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashJoinRule.scala
index 82a280f..e1d2cea 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashJoinRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecHashJoinRule.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.plan.rules.physical.batch
 
 import org.apache.flink.table.JDouble
-import org.apache.flink.table.api.{OptimizerConfigOptions, TableConfig}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.FlinkConventions
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSortAggRule.scala
index 40abde4..6a177f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSortAggRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSortAggRule.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.plan.rules.physical.batch
 
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.FlinkConventions
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
index 7d1f4e4..c41e775 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.rules.physical.batch
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, TableConfig, TableException}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
index 3b49fa1..07a7a0d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.rules.physical.stream
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
 import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
 import org.apache.flink.table.plan.util.FlinkRelOptUtil
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
index e008744..5bc5114 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.rules.physical.stream
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTrait, FlinkRelDistribution, FlinkRelDistributionTraitDef}
 import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 933ec38..d73fe33 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.table.JLong
-import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, TableConfig, TableException}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{DataTypes, TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.dataformat.BaseRow
@@ -43,6 +44,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
 import org.apache.flink.table.types.logical.{LogicalTypeRoot, _}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.BinaryStringTypeInfo
+
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rex.RexInputRef
@@ -50,6 +52,7 @@ import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
 import org.apache.calcite.tools.RelBuilder
+
 import java.time.Duration
 import java.util
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
index 203920d..9e4f46e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageParallelismCalculatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.resource.parallelism;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
 import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index 12dd76a..f253a70 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.batch
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.table.util.TableTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 83a51fb..e4a92d8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.stream
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.table.util.TableTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index b33ebfb..7805b63 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.catalog
 
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
-import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, TableEnvironment, TableException}
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableException}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory
 import org.apache.flink.types.Row
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.scala
index c815497..6f2daa5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.util.TableTestBase
 
 import org.junit.{Before, Test}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveCollationTest.scala
index 3969a6d..55823a4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveCollationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveCollationTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, Types}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortMergeJoinRule
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED
 import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveShuffleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveShuffleTest.scala
index ad38673..db8ab5c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveShuffleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/RemoveShuffleTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions, Types}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.plan.rules.physical.batch.{BatchExecJoinRuleBase, BatchExecSortMergeJoinRule}
 import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
 import org.apache.flink.table.util.{TableFunc1, TableTestBase}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
index 6998381..50ab944 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, ValidationException}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.plan.util.NonPojo
 import org.apache.flink.table.util.TableTestBase
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
index 72ec274..b9a119e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED
 import org.apache.flink.table.util.TableTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortTest.scala
index 22f4908..546f965 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED
 import org.apache.flink.table.util.TableTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index 7288a5b..867ee8f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.functions.aggfunctions.FirstValueAggFunction.IntFirstValueAggFunction
 import org.apache.flink.table.functions.aggfunctions.LastValueAggFunction.LongLastValueAggFunction
 import org.apache.flink.table.plan.rules.physical.batch.BatchExecSortMergeJoinRule
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/HashAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/HashAggregateTest.scala
index 2f1d077..2b8095c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/HashAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/HashAggregateTest.scala
@@ -17,7 +17,8 @@
  */
 package org.apache.flink.table.plan.batch.sql.agg
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions, TableException}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.util.OperatorType
 import org.apache.flink.table.util.AggregatePhaseStrategy
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/SortAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/SortAggregateTest.scala
index 0bbed95..dfdff36 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/SortAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/SortAggregateTest.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.plan.batch.sql.agg
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.plan.util.OperatorType
 import org.apache.flink.table.util.AggregatePhaseStrategy
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/WindowAggregateTest.scala
index 52bfff5..43d5eab 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/agg/WindowAggregateTest.scala
@@ -18,8 +18,9 @@
 package org.apache.flink.table.plan.batch.sql.agg
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{OptimizerConfigOptions, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.util.{AggregatePhaseStrategy, CountAggFunction, TableTestBase}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
index 756a151..2c1d226 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.TableException
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
index 69b93d5..391bd64 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.TableException
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 10ea8a3..ca5fcaa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.plan.batch.sql.join
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.optimize.program.FlinkBatchProgram
 import org.apache.flink.table.plan.stream.sql.join.TestTemporalTable
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.scala
index 8df75db..6106320 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.Before
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
index d152961..7cf898e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.Before
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
index e4703f7..6dba826 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
index 0089486..6fc8716 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.scala
index 5a97ee7..332e3ee 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
index f676f03..10e8416 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.plan.batch.sql.join
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 import org.junit.{Before, Test}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
index 32cba57..6465c2b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.plan.common
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{OptimizerConfigOptions, Types}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
 import org.apache.flink.table.util.{TableTestBase, TableTestUtil}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
index 3d0d4d7..5b13415 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
@@ -24,7 +24,8 @@ import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory
 import org.apache.flink.streaming.api.transformations.{SinkTransformation, SourceTransformation}
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableSchema, Types}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableSchema, Types}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.scala
index 75dc85e..869eb94 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.rules.logical
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.optimize.program.FlinkStreamProgram
 import org.apache.flink.table.util.TableTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
index 03564ba..b6dcd84 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.rules.physical.batch
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.util.TableTestBase
 
 import org.junit.{Before, Test}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
index 775de9d..511b8a1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.rules.physical.batch
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.util.TableTestBase
 
 import org.junit.{Before, Test}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.scala
index d5c3408..284e277 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/physical/stream/RetractionRulesWithTwoStageAggTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.plan.rules.physical.stream
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.OptimizerConfigOptions
 import org.apache.flink.table.util.{AggregatePhaseStrategy, TableTestBase}
 
 import org.junit.{Before, Test}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
index 4d79cfb..0640dec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.plan.stream.sql
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.util.WindowEmitStrategy.{SQL_EXEC_EMIT_EARLY_FIRE_DELAY, SQL_EXEC_EMIT_EARLY_FIRE_ENABLED}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
index dd11055..b9f4238 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index 53aa95c..c83b270 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.functions.aggfunctions.FirstValueAggFunction.IntFirstValueAggFunction
 import org.apache.flink.table.functions.aggfunctions.LastValueAggFunction.LongLastValueAggFunction
 import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
index 0cca30a..527e0e7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/AggregateTest.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.stream.sql.agg
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException, Types, ValidationException}
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
 import org.apache.flink.table.typeutils.DecimalTypeInfo
 import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.scala
index 8772500..5e97fa1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.stream.sql.agg
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.rules.physical.stream.IncrementalAggregateRule
 import org.apache.flink.table.util.{AggregatePhaseStrategy, StreamTableTestUtil, TableTestBase}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.scala
index 5eca0cb..3446a42 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/TwoStageAggregateTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.stream.sql.agg
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.util.{AggregatePhaseStrategy, TableTestBase}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/TwoStageAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/TwoStageAggregateTest.scala
index 902387f..6bd2e96 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/TwoStageAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/TwoStageAggregateTest.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.stream.table
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.util.{AggregatePhaseStrategy, StreamTableTestUtil, TableTestBase}
 
 import org.junit.{Before, Test}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 1e4279c..d28be4c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -26,7 +26,8 @@ import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.api.common.typeinfo.Types.INSTANT
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, ValidationException}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.dataformat.DataFormatConverters.{LocalDateConverter, LocalDateTimeConverter}
 import org.apache.flink.table.dataformat.Decimal
 import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/DecimalITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/DecimalITCase.scala
index 137d277..82b5ecb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/DecimalITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/DecimalITCase.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.runtime.batch.sql
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions}
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
index 7fd55e3..275bd26 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/MiscITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.batch.sql
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM
+import org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM
 import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper
 import org.apache.flink.table.runtime.batch.sql.join.JoinType.SortMergeJoin
 import org.apache.flink.table.runtime.utils.BatchTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
index 342f82b..d48c918 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/OverWindowITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, Types}
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
index fe3918e..2cf128d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -26,7 +26,8 @@ import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableException, TableSchema, ValidationException}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{TableConfig, TableSchema, ValidationException}
 import org.apache.flink.table.calcite.CalciteConfig
 import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase._
 import org.apache.flink.table.runtime.utils.BatchTestBase
@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.utils.TestData._
 import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
+
 import org.apache.calcite.config.Lex
 import org.apache.calcite.sql.parser.SqlParser
 import org.junit.Assert._
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/RankITCase.scala
index 1e5975c..76f3d5e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/RankITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/RankITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
-import org.apache.flink.table.api.ExecutionConfigOptions
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData._
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortLimitITCase.scala
index d9faca9..5e25f17 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortLimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortLimitITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.TestData._
 import org.apache.flink.types.Row
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnionITCase.scala
index e8d9082..43b2108 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/UnionITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
-import org.apache.flink.table.api.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
 import org.apache.flink.table.dataformat.BinaryString.fromString
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.{binaryRow, row}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
index f6aa573..f523d0e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.batch.sql.agg
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS
+import org.apache.flink.table.api.config.ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS
 import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.calcite.CalciteConfig
 import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkBatchProgram, FlinkGroupProgramBuilder, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
index 7a4b648..fcbc52e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
@@ -18,8 +18,9 @@
 package org.apache.flink.table.runtime.batch.sql.agg
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{OptimizerConfigOptions, Types}
-import org.apache.flink.table.api.ExecutionConfigOptions.{SQL_EXEC_DISABLED_OPERATORS, SQL_RESOURCE_DEFAULT_PARALLELISM, SQL_RESOURCE_HASH_AGG_TABLE_MEM}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.config.ExecutionConfigOptions.{SQL_EXEC_DISABLED_OPERATORS, SQL_RESOURCE_DEFAULT_PARALLELISM, SQL_RESOURCE_HASH_AGG_TABLE_MEM}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
 import org.apache.flink.table.runtime.utils.BatchTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
index e7b8913c..eaa615c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/GroupingSetsITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.batch.sql.agg
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException, Types}
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.runtime.utils.BatchTestBase
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.TestData._
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashAggITCase.scala
index 296e50d..416b7c5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashAggITCase.scala
@@ -18,8 +18,7 @@
 
 package org.apache.flink.table.runtime.batch.sql.agg
 
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 
 /**
   * AggregateITCase using HashAgg Operator.
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
index 9218e55..3d199ab 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.batch.sql.agg
 
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.plan.util.OperatorType
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
index f40d0af..8d3faad 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.batch.sql.agg
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions.{SQL_EXEC_DISABLED_OPERATORS, SQL_RESOURCE_DEFAULT_PARALLELISM}
+import org.apache.flink.table.api.config.ExecutionConfigOptions.{SQL_EXEC_DISABLED_OPERATORS, SQL_RESOURCE_DEFAULT_PARALLELISM}
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index 78ea69c..72f2fd0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.batch.sql.agg
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
 import org.apache.flink.table.plan.util.OperatorType
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
index c8c35aa..b544835 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.batch.sql.join
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper.disableOtherJoinOpForJoin
 import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
 import org.apache.flink.table.runtime.utils.BatchTestBase
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
index f4544f1..216dca8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCase.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO}
 import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
-import org.apache.flink.table.api.{ExecutionConfigOptions, Types}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.expressions.utils.FuncWithOpen
 import org.apache.flink.table.runtime.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCaseHelper.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCaseHelper.scala
index de9cb7c..998bb28 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCaseHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/JoinITCaseHelper.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.runtime.batch.sql.join
 
-import org.apache.flink.table.api.{OptimizerConfigOptions, ExecutionConfigOptions, TableEnvironment}
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/DecimalITCase.scala
index 8bb6054..aa345d1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/DecimalITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/DecimalITCase.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.runtime.batch.table
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, Table}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.{DataTypes, Table}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, BatchTestBase}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SplitAggregateITCase.scala
index 750086e..ee7580a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SplitAggregateITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.stream.sql.SplitAggregateITCase.PartialAggMode
 import org.apache.flink.table.runtime.utils.StreamingWithAggTestBase.{AggMode, LocalGlobalOff, LocalGlobalOn}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index 670bc94..e31971a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.transformations.ShuffleMode
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
-import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, SqlParserException, Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{EnvironmentSettings, SqlParserException, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.stats.FlinkStatistic
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithAggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithAggTestBase.scala
index 0c49792..751688a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithAggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithAggTestBase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.utils
 
 import java.util
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.table.api.OptimizerConfigOptions
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.runtime.utils.StreamingWithAggTestBase._
 import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithMiniBatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithMiniBatchTestBase.scala
index 0afccfe..4c90eb2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithMiniBatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithMiniBatchTestBase.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.runtime.utils
 
-import org.apache.flink.table.api.ExecutionConfigOptions.{SQL_EXEC_MINIBATCH_ALLOW_LATENCY, SQL_EXEC_MINIBATCH_SIZE, SQL_EXEC_MINIBATCH_ENABLED}
+import org.apache.flink.table.api.config.ExecutionConfigOptions.{SQL_EXEC_MINIBATCH_ALLOW_LATENCY, SQL_EXEC_MINIBATCH_SIZE, SQL_EXEC_MINIBATCH_ENABLED}
 import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 1e88629..08b5506 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
 import org.apache.flink.streaming.api.transformations.ShuffleMode
 import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
 import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JavaStreamTableEnvImpl}
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JavaStreamTableEnv}
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index 5a86c18..f16d5a2 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -50,6 +50,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
index 1ac0c0d..2e71061 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputVi
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
index 817a3f8..d752fdb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.runtime.operators.sort.Sorter;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.generated.NormalizedKeyComputer;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
index 326daf0..b8e7a69 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.operators.sort.IndexedSorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.generated.NormalizedKeyComputer;
 import org.apache.flink.table.generated.RecordComparator;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index c760f02..1e4ebb0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
index 6dcad8c..1933493 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.runtime.util.RowIterator;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
index 70cb5fb..44d663f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index 7eedc34..15a4154b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.table.api.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
 import org.apache.flink.table.dataformat.BinaryString;