You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/22 19:13:50 UTC

[flink] 02/08: [FLINK-26709][table] Replace TableConfig.getConfiguration.set()

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bd6aeffd0e0aa3bdde155e2a6a84c091de79790
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Mon Mar 21 16:10:30 2022 +0200

    [FLINK-26709][table] Replace TableConfig.getConfiguration.set()
    
    Since `TableConfig` is a `WritableConfig`, callers should directly call
    `TableConfig.set()` and avoid going throught `#getConfiguration()` which
    is there only for advanced internal configuration read value purposes.
    
    (cherry picked from commit 79110e987484a63184a5b978aea53ff402ba4c57)
---
 .../table/ElasticsearchDynamicSinkBaseITCase.java  |   6 +-
 .../flink/connectors/hive/HiveRunnerITCase.java    |   4 +-
 .../connectors/hive/HiveSinkCompactionITCase.java  |   4 +-
 .../flink/connectors/hive/HiveTableSinkITCase.java |   8 +-
 .../connectors/hive/HiveTableSourceITCase.java     |  31 +---
 .../hive/TableEnvHiveConnectorITCase.java          |   8 +-
 .../table/catalog/hive/HiveCatalogITCase.java      |  14 +-
 .../flink/table/catalog/hive/HiveTestUtils.java    |   8 +-
 .../connector/jdbc/catalog/MySqlCatalogITCase.java |   4 +-
 .../jdbc/catalog/PostgresCatalogITCase.java        |   4 +-
 .../kafka/table/KafkaChangelogTableITCase.java     |  37 +++--
 .../connectors/kafka/table/KafkaTableITCase.java   |  11 +-
 .../flink/python/tests/BatchPythonUdfSqlJob.java   |   2 +-
 .../apache/flink/table/tpcds/TpcdsTestProgram.java |  12 +-
 .../client/python/PythonFunctionFactoryTest.java   |   5 +-
 .../PythonScalarFunctionOperatorTestBase.java      |   5 +-
 .../table/planner/delegation/PlannerBase.scala     |   4 +-
 .../BuiltInAggregateFunctionTestBase.java          |   4 +-
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |  16 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.java   |   3 +-
 .../plan/batch/sql/DeadlockBreakupTest.scala       | 110 +++++++-------
 .../planner/plan/batch/sql/LegacySinkTest.scala    |   5 +-
 .../plan/batch/sql/MultipleInputCreationTest.scala |  56 +++----
 .../plan/batch/sql/RemoveCollationTest.scala       |  40 ++---
 .../planner/plan/batch/sql/RemoveShuffleTest.scala | 168 ++++++++++-----------
 .../planner/plan/batch/sql/SetOperatorsTest.scala  |   2 +-
 .../planner/plan/batch/sql/SortLimitTest.scala     |  24 +--
 .../table/planner/plan/batch/sql/SortTest.scala    |  34 ++---
 .../planner/plan/batch/sql/SubplanReuseTest.scala  |  68 ++++-----
 .../planner/plan/batch/sql/TableSinkTest.scala     |   5 +-
 .../planner/plan/batch/sql/TableSourceTest.scala   |   5 +-
 .../plan/batch/sql/agg/GroupWindowTest.scala       |   2 +-
 .../plan/batch/sql/agg/HashAggregateTest.scala     |   4 +-
 .../plan/batch/sql/agg/SortAggregateTest.scala     |   4 +-
 .../batch/sql/join/BroadcastHashJoinTest.scala     |   7 +-
 .../sql/join/BroadcastHashSemiAntiJoinTest.scala   |   6 +-
 .../plan/batch/sql/join/LookupJoinTest.scala       |   4 +-
 .../plan/batch/sql/join/NestedLoopJoinTest.scala   |   2 +-
 .../sql/join/NestedLoopSemiAntiJoinTest.scala      |   2 +-
 .../plan/batch/sql/join/ShuffledHashJoinTest.scala |   2 +-
 .../sql/join/ShuffledHashSemiAntiJoinTest.scala    |   2 +-
 .../plan/batch/sql/join/SortMergeJoinTest.scala    |   2 +-
 .../batch/sql/join/SortMergeSemiAntiJoinTest.scala |   2 +-
 .../planner/plan/common/JoinReorderTestBase.scala  |   8 +-
 .../planner/plan/common/TableFactoryTest.scala     |   2 +-
 .../table/planner/plan/hint/OptionsHintTest.scala  |   4 +-
 .../logical/JoinDeriveNullFilterRuleTest.scala     |   4 +-
 .../rules/logical/SplitAggregateRuleTest.scala     |  16 +-
 .../batch/EnforceLocalHashAggRuleTest.scala        |   4 +-
 .../batch/EnforceLocalSortAggRuleTest.scala        |   4 +-
 .../RemoveRedundantLocalHashAggRuleTest.scala      |  18 +--
 .../stream/ChangelogModeInferenceTest.scala        |   8 +-
 ...xpandWindowTableFunctionTransposeRuleTest.scala |   4 +-
 .../plan/stream/sql/DagOptimizationTest.scala      |  55 +++----
 .../planner/plan/stream/sql/DeduplicateTest.scala  |  14 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    |  42 +++---
 .../plan/stream/sql/ModifiedMonotonicityTest.scala |  18 +--
 .../table/planner/plan/stream/sql/RankTest.scala   |   5 +-
 .../planner/plan/stream/sql/SubplanReuseTest.scala |  34 ++---
 .../planner/plan/stream/sql/TableScanTest.scala    |  12 +-
 .../planner/plan/stream/sql/TableSinkTest.scala    |  23 ++-
 .../plan/stream/sql/agg/AggregateTest.scala        |  24 +--
 .../stream/sql/agg/DistinctAggregateTest.scala     |  11 +-
 .../plan/stream/sql/agg/GroupWindowTest.scala      |  16 +-
 .../stream/sql/agg/IncrementalAggregateTest.scala  |   4 +-
 .../stream/sql/agg/TwoStageAggregateTest.scala     |   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  38 ++---
 .../utils/StreamingWithMiniBatchTestBase.scala     |   6 +-
 68 files changed, 543 insertions(+), 579 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
index 21ad5fd..75dd9b9 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -310,10 +311,7 @@ abstract class ElasticsearchDynamicSinkBaseITCase {
                 TableEnvironment.create(EnvironmentSettings.inStreamingMode());
 
         DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
-        tableEnvironment
-                .getConfig()
-                .getConfiguration()
-                .setString("table.local-time-zone", "Asia/Shanghai");
+        tableEnvironment.getConfig().set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");
 
         String dynamicIndex1 =
                 "dynamic-index-"
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java
index 2b2d793..606c351 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java
@@ -539,9 +539,7 @@ public class HiveRunnerITCase {
             tableEnv.executeSql("create table db1.src (x smallint,y int) stored as orc");
             hiveShell.execute("insert into table db1.src values (1,100),(2,200)");
 
-            tableEnv.getConfig()
-                    .getConfiguration()
-                    .setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+            tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
 
             tableEnv.executeSql("alter table db1.src change x x int");
             assertEquals(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
index d2f6db5..ffe2222 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
@@ -55,9 +55,7 @@ public class HiveSinkCompactionITCase extends CompactionITCaseBase {
         tEnv().useCatalog(hiveCatalog.getName());
 
         // avoid too large parallelism lead to scheduler dead lock in streaming mode
-        tEnv().getConfig()
-                .getConfiguration()
-                .set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+        tEnv().getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
 
         super.init();
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 1d2b4f5..39f7950 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -569,13 +569,9 @@ public class HiveTableSinkITCase {
         tEnv.useCatalog(hiveCatalog.getName());
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
         if (useMr) {
-            tEnv.getConfig()
-                    .getConfiguration()
-                    .set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, true);
+            tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, true);
         } else {
-            tEnv.getConfig()
-                    .getConfiguration()
-                    .set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
+            tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
         }
 
         try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index d9e1fad..df3023f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -473,9 +473,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
                         "select * from hive.source_db.test_parallelism_setting_with_file_num");
         testParallelismSettingTranslateAndAssert(3, table, tEnv);
 
-        tEnv.getConfig()
-                .getConfiguration()
-                .setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2);
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2);
         testParallelismSettingTranslateAndAssert(2, table, tEnv);
     }
 
@@ -496,12 +494,8 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         final String dbName = "source_db";
         final String tblName = "test_parallelism_limit_pushdown";
         TableEnvironment tEnv = createTableEnv();
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-        tEnv.getConfig()
-                .getConfiguration()
-                .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
         tEnv.executeSql(
                 "CREATE TABLE source_db.test_parallelism_limit_pushdown "
                         + "(`year` STRING, `value` INT) partitioned by (pt int)");
@@ -536,9 +530,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
         tEnv.registerCatalog("hive", hiveCatalog);
         tEnv.useCatalog("hive");
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
         tEnv.executeSql(
                 "CREATE TABLE source_db.test_parallelism_no_infer "
                         + "(`year` STRING, `value` INT) partitioned by (pt int)");
@@ -773,9 +765,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv =
                 HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
         tEnv.registerCatalog(catalogName, hiveCatalog);
         tEnv.useCatalog(catalogName);
         tEnv.executeSql(
@@ -838,15 +828,10 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();
 
         TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode();
+        tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
         tableEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+                .set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
+        tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
         tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
         tableEnv.useCatalog(catalogSpy.getName());
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 1d760d0..349264d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -431,9 +431,7 @@ public class TableEnvHiveConnectorITCase {
                     String.format(
                             "create table db1.t2 (y int,x int) stored as parquet location '%s'",
                             location));
-            tableEnv.getConfig()
-                    .getConfiguration()
-                    .setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+            tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
             assertEquals(
                     "[+I[1], +I[2]]",
                     CollectionUtil.iteratorToList(
@@ -584,9 +582,7 @@ public class TableEnvHiveConnectorITCase {
             // test.parquet data: hehuiyuan	{}	[]
             String folderURI = this.getClass().getResource("/parquet").getPath();
 
-            tableEnv.getConfig()
-                    .getConfiguration()
-                    .set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+            tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
             tableEnv.executeSql(
                     String.format(
                             "create external table src_t (a string, b map<string, string>, c array<string>) stored as %s location 'file://%s'",
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index b18e11d..77b289e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -218,9 +218,7 @@ public class HiveCatalogITCase {
     public void testReadWriteCsv() throws Exception {
         // similar to CatalogTableITCase::testReadWriteCsvUsingDDL but uses HiveCatalog
         TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         tableEnv.registerCatalog("myhive", hiveCatalog);
         tableEnv.useCatalog("myhive");
@@ -304,9 +302,7 @@ public class HiveCatalogITCase {
             settings = EnvironmentSettings.inBatchMode();
         }
         TableEnvironment tableEnv = TableEnvironment.create(settings);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         tableEnv.registerCatalog("myhive", hiveCatalog);
         tableEnv.useCatalog("myhive");
@@ -335,9 +331,7 @@ public class HiveCatalogITCase {
     @Test
     public void testTableWithPrimaryKey() {
         TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         tableEnv.registerCatalog("catalog1", hiveCatalog);
         tableEnv.useCatalog("catalog1");
@@ -386,7 +380,7 @@ public class HiveCatalogITCase {
                 TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
         tEnv.registerCatalog("myhive", hiveCatalog);
         tEnv.useCatalog("myhive");
-        tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         String path = this.getClass().getResource("/csv/test.csv").getPath();
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index b3717ae..4c8c5b7 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -153,9 +153,7 @@ public class HiveTestUtils {
 
     public static TableEnvironment createTableEnvInBatchMode(SqlDialect dialect) {
         TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
         tableEnv.getConfig().setSqlDialect(dialect);
         return tableEnv;
     }
@@ -168,9 +166,7 @@ public class HiveTestUtils {
     public static StreamTableEnvironment createTableEnvInStreamingMode(
             StreamExecutionEnvironment env, SqlDialect dialect) {
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
         tableEnv.getConfig().setSqlDialect(dialect);
         return tableEnv;
     }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index 9639792..0e2c486 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -155,9 +155,7 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
     @Before
     public void setup() {
         this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         // Use mysql catalog.
         tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 936230b..020b7da 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -40,9 +40,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     @Before
     public void setup() {
         this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
         // use PG catalog
         tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 62a7d63..85c4af6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -20,14 +20,16 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Before;
@@ -58,11 +60,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
         createTestTopic(topic, 1, 1);
 
         // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
-        Configuration tableConf = tEnv.getConfig().getConfiguration();
-        tableConf.setString("table.exec.mini-batch.enabled", "true");
-        tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
-        tableConf.setString("table.exec.mini-batch.size", "5000");
-        tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+        TableConfig tableConf = tEnv.getConfig();
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
+        tableConf.set(
+                ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
+        tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         // ---------- Write the Debezium json into Kafka -------------------
         List<String> lines = readLines("debezium-data-schema-exclude.txt");
@@ -186,11 +189,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
         // configure time zone of  the Canal Json metadata "ingestion-timestamp"
         tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
         // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
-        Configuration tableConf = tEnv.getConfig().getConfiguration();
-        tableConf.setString("table.exec.mini-batch.enabled", "true");
-        tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
-        tableConf.setString("table.exec.mini-batch.size", "5000");
-        tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+        TableConfig tableConf = tEnv.getConfig();
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
+        tableConf.set(
+                ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
+        tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         // ---------- Write the Canal json into Kafka -------------------
         List<String> lines = readLines("canal-data.txt");
@@ -326,11 +330,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
         // configure time zone of  the Maxwell Json metadata "ingestion-timestamp"
         tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
         // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
-        Configuration tableConf = tEnv.getConfig().getConfiguration();
-        tableConf.setString("table.exec.mini-batch.enabled", "true");
-        tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
-        tableConf.setString("table.exec.mini-batch.size", "5000");
-        tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+        TableConfig tableConf = tEnv.getConfig();
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
+        tableConf.set(
+                ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
+        tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
+        tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         // ---------- Write the Maxwell json into Kafka -------------------
         List<String> lines = readLines("maxwell-data.txt");
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index c268453..cfd5a46 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.test.util.SuccessException;
@@ -524,7 +525,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         // 'value.source.timestamp'` DDL
         // will use the session time zone when convert the changelog time from milliseconds to
         // timestamp
-        tEnv.getConfig().getConfiguration().setString("table.local-time-zone", "UTC");
+        tEnv.getConfig().set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
 
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
@@ -760,9 +761,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         // ---------- Produce an event time stream into Kafka -------------------
         String groupId = getStandardProps().getProperty("group.id");
         String bootstraps = getBootstrapServers();
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
+        tEnv.getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
 
         final String createTable =
                 String.format(
@@ -878,9 +877,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         // ---------- Produce an event time stream into Kafka -------------------
         String bootstraps = getBootstrapServers();
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
+        tEnv.getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
 
         final String createTableSql =
                 "CREATE TABLE %s (\n"
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/BatchPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/BatchPythonUdfSqlJob.java
index 5639aca..e8d286c 100644
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/BatchPythonUdfSqlJob.java
+++ b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/BatchPythonUdfSqlJob.java
@@ -32,7 +32,7 @@ public class BatchPythonUdfSqlJob {
 
     public static void main(String[] args) {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1);
+        tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
         tEnv.executeSql(
                 "create temporary system function add_one as 'add_one.add_one' language python");
 
diff --git a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
index c6251aa..7c40ae2 100644
--- a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
+++ b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
@@ -140,18 +140,14 @@ public class TpcdsTestProgram {
         // config Optimizer parameters
         // TODO use the default shuffle mode of batch runtime mode once FLINK-23470 is implemented
         tEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
                         GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
         tEnv.getConfig()
-                .getConfiguration()
-                .setLong(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
-                        10 * 1024 * 1024);
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
+                        10 * 1024 * 1024L);
+        tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
 
         // register TPC-DS tables
         TPCDS_TABLES.forEach(
diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
index b51cf07..87da19b 100644
--- a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
+++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
@@ -69,8 +70,8 @@ public class PythonFunctionFactoryTest {
         }
         StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         tableEnv = StreamTableEnvironment.create(sEnv);
-        tableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
-        tableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
+        tableEnv.getConfig().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+        tableEnv.getConfig().set(TASK_OFF_HEAP_MEMORY, MemorySize.parse("80mb"));
         sourceTable = tableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
     }
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTestBase.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTestBase.java
index 8abfbc6..cc752d3 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTestBase.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.python.PythonOptions;
@@ -218,9 +219,7 @@ public abstract class PythonScalarFunctionOperatorTestBase<IN, OUT, UDFIN> {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         StreamTableEnvironment tEnv = createTableEnvironment(env);
-        tEnv.getConfig()
-                .getConfiguration()
-                .setString(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), "80mb");
+        tEnv.getConfig().set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.parse("80mb"));
         tEnv.registerFunction("pyFunc", new PythonScalarFunction("pyFunc"));
         DataStream<Tuple2<Integer, Integer>> ds = env.fromElements(new Tuple2<>(1, 2));
         Table t = tEnv.fromDataStream(ds, $("a"), $("b")).select(call("pyFunc", $("a"), $("b")));
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index bf6871c..c4e7aae 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -484,10 +484,10 @@ abstract class PlannerBase(
     // Add query start time to TableConfig, these config are used internally,
     // these configs will be used by temporal functions like CURRENT_TIMESTAMP,LOCALTIMESTAMP.
     val epochTime :JLong = System.currentTimeMillis()
-    configuration.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
+    tableConfig.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
     val localTime :JLong =  epochTime +
       TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
-    configuration.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
+    tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
 
     getExecEnv.configure(
       configuration,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index 352ed83..ba2f01e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.functions;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -79,9 +78,8 @@ public class BuiltInAggregateFunctionTestBase {
     public void testFunction() throws Exception {
         final TableEnvironment tEnv =
                 TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        Configuration configuration = tEnv.getConfig().getConfiguration();
         // see https://issues.apache.org/jira/browse/FLINK-26092
-        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
+        tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
         final Table sourceTable = asTable(tEnv, testSpec.sourceRowType, testSpec.sourceRows);
 
         for (final TestItem testItem : testSpec.testItems) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index 6e29c40..d27dbb5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
@@ -112,18 +111,17 @@ public class LogicalTypeJsonSerdeTest {
     public void testIdentifierSerde() throws IOException {
         final DataTypeFactoryMock dataTypeFactoryMock = new DataTypeFactoryMock();
         final TableConfig tableConfig = TableConfig.getDefault();
-        final Configuration config = tableConfig.getConfiguration();
         final CatalogManager catalogManager =
                 preparedCatalogManager().dataTypeFactory(dataTypeFactoryMock).build();
         final SerdeContext serdeContext = configuredSerdeContext(catalogManager, tableConfig);
 
         // minimal plan content
-        config.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, IDENTIFIER);
+        tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, IDENTIFIER);
         final String minimalJson = toJson(serdeContext, STRUCTURED_TYPE);
         assertThat(minimalJson).isEqualTo("\"`default_catalog`.`default_database`.`MyType`\"");
 
         // catalog lookup with miss
-        config.set(
+        tableConfig.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.empty();
@@ -131,7 +129,7 @@ public class LogicalTypeJsonSerdeTest {
                 .satisfies(anyCauseMatches(ValidationException.class, "No type found."));
 
         // catalog lookup
-        config.set(
+        tableConfig.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.of(STRUCTURED_TYPE);
@@ -139,7 +137,7 @@ public class LogicalTypeJsonSerdeTest {
                 .isEqualTo(STRUCTURED_TYPE);
 
         // maximum plan content
-        config.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
+        tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
         final String maximumJson = toJson(serdeContext, STRUCTURED_TYPE);
         final ObjectMapper mapper = new ObjectMapper();
         final JsonNode maximumJsonNode = mapper.readTree(maximumJson);
@@ -149,7 +147,7 @@ public class LogicalTypeJsonSerdeTest {
                 .isEqualTo("My original type.");
 
         // catalog lookup with miss
-        config.set(
+        tableConfig.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.empty();
@@ -157,7 +155,7 @@ public class LogicalTypeJsonSerdeTest {
                 .satisfies(anyCauseMatches(ValidationException.class, "No type found."));
 
         // catalog lookup
-        config.set(
+        tableConfig.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
@@ -165,7 +163,7 @@ public class LogicalTypeJsonSerdeTest {
                 .isEqualTo(UPDATED_STRUCTURED_TYPE);
 
         // no lookup
-        config.set(
+        tableConfig.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.ALL);
         dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
index 51aee49..b767162 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
@@ -124,8 +124,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // disable push down local agg
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setBoolean(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
                         false);
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.scala
index 56a0a88..9f4c18f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.scala
@@ -38,10 +38,10 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_SetExchangeAsBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH t AS (SELECT x.a AS a, x.b AS b, y.d AS d, y.e AS e FROM x, y WHERE x.a = y.d)
@@ -52,11 +52,11 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_AddExchangeAsBatch_HashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -68,11 +68,11 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_AddExchangeAsBatch_NestedLoopJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -84,12 +84,12 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_SetExchangeAsBatch_SortMergeJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,HashAgg")
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |
@@ -104,13 +104,13 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_AddExchangeAsBatch_BuildLeftSemiHashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -124,8 +124,8 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_SetExchangeAsBatch_OverAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r1 AS (SELECT SUM(a) OVER (PARTITION BY b ORDER BY b) AS a, b, c FROM x),
@@ -141,11 +141,11 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testReusedNodeIsBarrierNode(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       """
@@ -157,35 +157,35 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testDataStreamReuse_SetExchangeAsBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery = "SELECT * FROM t t1, t t2 WHERE t1.a = t2.a AND t1.b > 10 AND t2.c LIKE 'Test%'"
     util.verifyExecPlan(sqlQuery)
   }
 
   @Test
   def testDataStreamReuse_AddExchangeAsBatch_NestedLoopJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
     val sqlQuery = "SELECT * FROM t t1, t t2 WHERE t1.a = t2.b"
     util.verifyExecPlan(sqlQuery)
   }
 
   @Test
   def testDataStreamReuse_AddExchangeAsBatch_HashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
     val sqlQuery = "SELECT * FROM t INTERSECT SELECT * FROM t"
     util.verifyExecPlan(sqlQuery)
   }
 
   @Test
   def testSubplanReuse_BuildAndProbeNoCommonSuccessors_HashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       s"""
@@ -202,9 +202,9 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_AddSingletonExchange(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin,HashAgg")
     val sqlQuery =
       s"""
@@ -219,9 +219,9 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_DeadlockCausedByReusingExchange(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       s"""
@@ -234,11 +234,11 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_DeadlockCausedByReusingExchangeInAncestor(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
index 8266cda..7183c92 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacySinkTest.scala
@@ -46,8 +46,9 @@ class LegacySinkTest extends TableTestBase {
   @Test
   def testMultiSinks(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c")
     util.tableEnv.registerTable("table1", table1)
     val table2 = util.tableEnv.sqlQuery("SELECT SUM(sum_a) AS total_sum FROM table1")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.scala
index c4f2bbb..90024e8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.scala
@@ -44,12 +44,12 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
     util.addTableSource[(Int, Long, String, Int)]("y", 'd, 'e, 'f, 'ny)
     util.addTableSource[(Int, Long, String, Int)]("z", 'g, 'h, 'i, 'nz)
     util.addDataStream[(Int, Long, String)]("t", 'a, 'b, 'c)
-    util.tableConfig.getConfiguration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
+    util.tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
   }
 
   @Test
   def testBasicMultipleInput(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
@@ -71,9 +71,9 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
     //                 \-> [J -> J] -> [Agg -> J -/
     //                      |    |             |
     //                      y    t             y
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin,SortAgg")
     val sql =
       """
@@ -102,7 +102,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testJoinWithAggAsProbe(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sql =
       """
@@ -119,7 +119,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
   @Test
   def testKeepMultipleInputWithOneMemberForChainableSource(): Unit = {
     createChainableTableSource()
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql = "SELECT * FROM chainable LEFT JOIN x ON chainable.a = x.a"
     util.verifyExecPlan(sql)
@@ -127,7 +127,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testAvoidIncludingUnionFromInputSide(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -141,7 +141,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
   @Test
   def testIncludeUnionForChainableSource(): Unit = {
     createChainableTableSource()
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -154,7 +154,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testAvoidIncludingCalcAfterNonChainableSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -169,7 +169,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
   @Test
   def testIncludeCalcForChainableSource(): Unit = {
     createChainableTableSource()
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -183,7 +183,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testAvoidIncludingSingleton(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin,HashAgg")
     val sql =
       """
@@ -201,7 +201,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testNoPriorityConstraint(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
     val sql =
       """
@@ -214,7 +214,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testRelatedInputs(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -232,7 +232,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testRelatedInputsWithAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin,SortAgg")
     val sql =
       """
@@ -250,9 +250,9 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testRemoveRedundantUnion(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sql =
       """
@@ -271,9 +271,9 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testRemoveOneInputOperatorFromRoot(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
@@ -290,7 +290,7 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testCleanUpMultipleInputWithOneMember(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
@@ -307,9 +307,9 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
   @Test
   def testKeepUsefulUnion(): Unit = {
     createChainableTableSource()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sql =
       """
@@ -326,9 +326,9 @@ class MultipleInputCreationTest(shuffleMode: BatchShuffleMode) extends TableTest
 
   @Test
   def testDeadlockCausedByExchangeInAncestor(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
index ad1f32b..0038f52 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
@@ -59,13 +59,13 @@ class RemoveCollationTest extends TableTestBase {
       FlinkStatistic.builder().tableStats(new TableStats(100L)).build()
     )
 
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, Boolean.box(true))
   }
 
   @Test
   def testRemoveCollation_OverWindowAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,HashAgg")
     val sqlQuery =
       """
@@ -81,7 +81,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Aggregate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
     val sqlQuery =
       """
@@ -93,7 +93,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Aggregate_1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
     val sqlQuery =
       """
@@ -105,7 +105,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Sort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT a, b, COUNT(c) AS cnt FROM x GROUP BY a, b)
@@ -117,9 +117,9 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Aggregate_3(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x ORDER BY a, b)
@@ -130,7 +130,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Rank_1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery =
       """
@@ -145,7 +145,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Rank_2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery =
       """
@@ -174,7 +174,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Rank_4(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery =
       """
@@ -189,7 +189,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Rank_Singleton(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery =
       """
@@ -204,7 +204,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_MultipleSortMergeJoins1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
 
     val sql =
@@ -220,7 +220,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_MultipleSortMergeJoins_MultiJoinKeys1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
 
     val sql =
@@ -236,7 +236,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_MultipleSortMergeJoins2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
 
     val sql =
@@ -252,7 +252,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_MultipleSortMergeJoins_MultiJoinKeys2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
 
     val sql =
@@ -268,7 +268,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_MultipleSortMergeJoins3(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
     util.addTableSource("tb1",
       Array[TypeInformation[_]](
@@ -344,7 +344,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Correlate1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin,HashAgg")
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
@@ -358,7 +358,7 @@ class RemoveCollationTest extends TableTestBase {
 
   @Test
   def testRemoveCollation_Correlate2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin,HashAgg")
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
@@ -373,7 +373,7 @@ class RemoveCollationTest extends TableTestBase {
   @Test
   def testRemoveCollation_Correlate3(): Unit = {
     // do not remove shuffle
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin,HashAgg")
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.scala
index 39951be..6d0bb65 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.scala
@@ -45,13 +45,13 @@ class RemoveShuffleTest extends TableTestBase {
       Array("d", "e", "f"),
       FlinkStatistic.builder().tableStats(new TableStats(100L)).build()
     )
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
   }
 
   @Test
   def testRemoveHashShuffle_OverWindowAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       """
@@ -68,7 +68,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_MultiOverWindowAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       """
@@ -85,10 +85,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_OverWindowAgg_PartialKey(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     // push down HashExchange[c] into HashAgg
     val sqlQuery =
       """
@@ -105,10 +105,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Agg_PartialKey(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     // push down HashExchange[c] into HashAgg
     val sqlQuery =
       """
@@ -120,11 +120,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashAggregate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -135,11 +135,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashAggregate_1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -150,11 +150,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashAggregate_2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -165,11 +165,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortAggregate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,HashAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -180,11 +180,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortAggregate_1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,HashAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -195,11 +195,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortAggregate_2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,HashAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -210,10 +210,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortMergeJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -224,10 +224,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortMergeJoin_LOJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x left join (SELECT * FROM y WHERE e = 2) r on a = d)
@@ -238,10 +238,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortMergeJoin_ROJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x right join (SELECT * FROM y WHERE e = 2) r on a = d)
@@ -252,7 +252,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_SortMergeJoin_FOJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
     val sqlQuery =
       """
@@ -264,11 +264,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -279,7 +279,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_BroadcastHashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -291,11 +291,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashJoin_LOJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x left join (SELECT * FROM y WHERE e = 2) r on a = d)
@@ -306,11 +306,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashJoin_ROJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x right join (SELECT * FROM y WHERE e = 2) r on a = d)
@@ -321,11 +321,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashJoin_FOJ(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x full join (SELECT * FROM y WHERE e = 2) r on a = d)
@@ -336,11 +336,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_HashJoin_1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r1 AS (SELECT a, c, sum(b) FROM x group by a, c),
@@ -352,7 +352,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_NestedLoopJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -364,13 +364,13 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Join_PartialKey(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT d, count(f) as cnt FROM y GROUP BY d)
@@ -387,7 +387,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Union(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       """
@@ -402,7 +402,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
     val sqlQuery =
       """
@@ -417,10 +417,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank_PartialKey1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |SELECT a, SUM(b) FROM (
@@ -434,10 +434,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank_PartialKey2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(false))
     val sqlQuery =
       """
         |SELECT * FROM (
@@ -451,10 +451,10 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank_PartialKey3(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |SELECT * FROM (
@@ -468,7 +468,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank_Singleton1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
     val sqlQuery =
       """
@@ -483,7 +483,7 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Rank_Singleton2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
     val sqlQuery =
       """
@@ -498,11 +498,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Correlate1(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
       """
@@ -515,11 +515,11 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveHashShuffle_Correlate2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
       """
@@ -533,11 +533,11 @@ class RemoveShuffleTest extends TableTestBase {
   @Test
   def testRemoveHashShuffle_Correlate3(): Unit = {
     // do not remove shuffle
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     util.addFunction("split", new TableFunc1)
     val sqlQuery =
       """
@@ -555,14 +555,14 @@ class RemoveShuffleTest extends TableTestBase {
 
   @Test
   def testRemoveSingletonShuffle_HashAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
     util.verifyExecPlan("SELECT MAX(b) FROM (SELECT SUM(b) AS b FROM x)")
   }
 
   @Test
   def testRemoveSingletonShuffle_SortAgg(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     util.verifyExecPlan("SELECT MAX(b) FROM (SELECT SUM(b) AS b FROM x)")
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.scala
index 5bf4639..b5b7abe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.scala
@@ -34,7 +34,7 @@ class SetOperatorsTest extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
     util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
     util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.scala
index 070de14d..8cf961d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.scala
@@ -30,66 +30,66 @@ class SortLimitTest extends TableTestBase {
 
   private val util = batchTestUtil()
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-  util.tableEnv.getConfig.getConfiguration.setInteger(
-    ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
+  util.tableEnv.getConfig.set(
+    ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(200))
 
   @Test
   def testNonRangeSortWithoutOffset(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 5")
   }
 
   @Test
   def testNonRangeSortWithLimit0(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 0")
   }
 
   @Test
   def testNonRangeSortOnlyWithOffset(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC OFFSET 10 ROWS")
   }
 
   @Test
   def testNoneRangeSortWithOffsetLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 10 OFFSET 1")
   }
 
   @Test
   def testNoneRangeSortWithOffsetLimit0(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 0 OFFSET 1")
   }
 
   @Test
   def testRangeSortOnWithoutOffset(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 5")
   }
 
   @Test
   def testRangeSortOnWithLimit0(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 0")
   }
 
   @Test
   def testRangeSortOnlyWithOffset(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC OFFSET 10 ROWS")
   }
 
   @Test
   def testRangeSortWithOffsetLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 10 OFFSET 1")
   }
 
   @Test
   def testRangeSortWithOffsetLimit0(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 0 OFFSET 1")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala
index 55401f3..d2f13cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala
@@ -33,44 +33,44 @@ class SortTest extends TableTestBase {
 
   @Test
   def testNonRangeSortOnSingleFieldWithoutForceLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(-1))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC")
   }
 
   @Test
   def testNonRangeSortOnMultiFieldsWithoutForceLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      TABLE_EXEC_RANGE_SORT_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
+    util.tableEnv.getConfig.set(
+      TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(-1))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b")
   }
 
   @Test
   def testNonRangeSortWithForceLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      TABLE_EXEC_RANGE_SORT_ENABLED, false)
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
+    util.tableEnv.getConfig.set(
+      TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(false))
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(200))
     util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC")
   }
 
   @Test
   def testRangeSortWithoutForceLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(-1))
     // exec node does not support range sort yet, so we verify rel plan here
     util.verifyRelPlan("SELECT * FROM MyTable ORDER BY a DESC")
   }
 
   @Test
   def testRangeSortWithForceLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, Integer.valueOf(200))
     // exec node does not support range sort yet, so we verify rel plan here
     util.verifyRelPlan("SELECT * FROM MyTable ORDER BY a DESC")
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala
index 33466b3..5180190 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala
@@ -37,18 +37,18 @@ class SubplanReuseTest extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
     util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
   }
 
   @Test
   def testDisableSubplanReuse(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
     val sqlQuery =
       """
         |WITH r AS (
@@ -61,8 +61,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     // can not reuse because of different row-type
     val sqlQuery =
       """
@@ -75,8 +75,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testEnableReuseTableSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH t AS (SELECT x.a AS a, x.b AS b, y.d AS d, y.e AS e FROM x, y WHERE x.a = y.d)
@@ -87,8 +87,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testDisableReuseTableSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     val sqlQuery =
       """
         |WITH t AS (SELECT * FROM x, y WHERE x.a = y.d)
@@ -100,9 +100,9 @@ class SubplanReuseTest extends TableTestBase {
   @Test
   def testSubplanReuseOnSourceWithLimit(): Unit = {
     // TODO re-check this plan after PushLimitIntoTableSourceScanRule is introduced
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -126,7 +126,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnCalc(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -166,7 +166,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnExchange(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -180,7 +180,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnHashAggregate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.SortAgg.toString)
     val sqlQuery =
       """
@@ -192,7 +192,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnSortAggregate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.HashAgg.toString)
     val sqlQuery =
       """
@@ -222,7 +222,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnSort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC)
@@ -233,7 +233,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -276,10 +276,10 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnSortMergeJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalSortMergeJoinRule.TABLE_OPTIMIZER_SMJ_REMOVE_SORT_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -290,7 +290,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnHashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -302,7 +302,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnNestedLoopJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -352,7 +352,7 @@ class SubplanReuseTest extends TableTestBase {
   @Test
   def testSubplanReuseOnCorrelate(): Unit = {
     util.addFunction("str_split", new StringSplit())
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -391,7 +391,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testNestedSubplanReuse(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin,SortAgg")
     val sqlQuery =
       """
@@ -427,7 +427,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testBreakupDeadlockOnHashJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -439,7 +439,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testBreakupDeadlockOnNestedLoopJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -451,15 +451,15 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testEnableReuseTableSourceOnNewSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
     testReuseOnNewSource()
   }
 
   @Test
   def testDisableReuseTableSourceOnNewSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     testReuseOnNewSource()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
index 8373789..7a9d275 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.scala
@@ -72,8 +72,9 @@ class TableSinkTest extends TableTestBase {
          |)
          |""".stripMargin)
 
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c")
     util.tableEnv.createTemporaryView("table1", table1)
     val stmtSet = util.tableEnv.createStatementSet()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 0edefde..ca28de3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -220,8 +220,9 @@ class TableSourceTest extends TableTestBase {
 
   @Test
   def testTableHintWithDigestReuseForLogicalTableScan(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     util.tableEnv.executeSql(
       s"""
          |CREATE TABLE MySink (
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/GroupWindowTest.scala
index 7b82fee..35fef12 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/GroupWindowTest.scala
@@ -39,7 +39,7 @@ class GroupWindowTest(aggStrategy: AggregatePhaseStrategy) extends TableTestBase
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggStrategy.toString)
     util.addFunction("countFun", new CountAggFunction)
     util.addTableSource[(Int, Timestamp, Int, Long)]("MyTable", 'a, 'b, 'c, 'd)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.scala
index c972b67..1239ce0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.scala
@@ -36,9 +36,9 @@ class HashAggregateTest(aggStrategy: AggregatePhaseStrategy) extends AggregateTe
   @Before
   def before(): Unit = {
     // disable sort agg
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.SortAgg.toString)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggStrategy.toString)
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.scala
index e0dfe72..af072fa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.scala
@@ -35,9 +35,9 @@ class SortAggregateTest(aggStrategy: AggregatePhaseStrategy) extends AggregateTe
   @Before
   def before(): Unit = {
     // disable hash agg
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.HashAgg.toString)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggStrategy.toString)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
index 07d9b85..f738ec8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
@@ -27,9 +27,10 @@ class BroadcastHashJoinTest extends JoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.MaxValue)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(Long.MaxValue))
+    Long.box(10)
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
       "SortMergeJoin, NestedLoopJoin, ShuffleHashJoin")
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
index 6678332..5b7730b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.scala
@@ -27,9 +27,9 @@ class BroadcastHashSemiAntiJoinTest extends SemiAntiJoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.MaxValue)
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(Long.MaxValue))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
       "SortMergeJoin, NestedLoopJoin, ShuffleHashJoin")
     // the result plan may contains NestedLoopJoin (singleRowJoin)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 3dc3e7d..6028eb4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -329,8 +329,8 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase {
 
   @Test
   def testReusing(): Unit = {
-    testUtil.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
+    testUtil.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
     val sql1 =
       """
         |SELECT b, a, sum(c) c, sum(d) d, PROCTIME() as proctime
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
index ff6b724..3b45288 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
@@ -26,7 +26,7 @@ class NestedLoopJoinTest extends JoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin, HashJoin")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
index c8548f8..b884b31 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.scala
@@ -26,7 +26,7 @@ class NestedLoopSemiAntiJoinTest extends SemiAntiJoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin, HashJoin")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
index c262b51..4b7966b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
@@ -27,7 +27,7 @@ class ShuffledHashJoinTest extends JoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
       "SortMergeJoin, NestedLoopJoin, BroadcastHashJoin")
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
index f4814c4..8315cfe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
@@ -27,7 +27,7 @@ class ShuffledHashSemiAntiJoinTest extends SemiAntiJoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
       "SortMergeJoin, NestedLoopJoin, BroadcastHashJoin")
     // the result plan may contains NestedLoopJoin (singleRowJoin)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
index b8b0a70..2eacfc9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
@@ -27,7 +27,7 @@ class SortMergeJoinTest extends JoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
index e358c7b..4a79b98 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.scala
@@ -27,7 +27,7 @@ class SortMergeSemiAntiJoinTest extends SemiAntiJoinTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     // the result plan may contains NestedLoopJoin (singleRowJoin)
     // which is converted by BatchExecSingleRowJoinRule
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
index 0cf6659..2137811 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
@@ -70,8 +70,8 @@ abstract class JoinReorderTestBase extends TableTestBase {
         "b5" -> new ColumnStats(200L, 0L, 8.0, 8, null, null)
       ))).build())
 
-    util.getTableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true)
+    util.getTableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, Boolean.box(true))
   }
 
   @Test
@@ -266,8 +266,8 @@ abstract class JoinReorderTestBase extends TableTestBase {
         "b8" -> builderB.build()
       ))).build())
 
-    util.getTableEnv.getConfig.getConfiguration.setLong(
-      JoinDeriveNullFilterRule.TABLE_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, 10000L)
+    util.getTableEnv.getConfig.set(
+      JoinDeriveNullFilterRule.TABLE_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, Long.box(10000))
     val sql =
       s"""
          |SELECT * FROM T6
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
index c33a9d2..50fc563 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
@@ -47,7 +47,7 @@ class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
       ObjectIdentifier.of("cat", "default", "t1"),
       ObjectIdentifier.of("cat", "default", "t2"),
       isBatch)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true)
+    util.tableEnv.getConfig.set(TestContextTableFactory.REQUIRED_KEY, Boolean.box(true))
     util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") {
       override def getTableFactory: Optional[TableFactory] = Optional.of(factory)
     })
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/hint/OptionsHintTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/hint/OptionsHintTest.scala
index 0ff55d1..ed7e224 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/hint/OptionsHintTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/hint/OptionsHintTest.scala
@@ -75,9 +75,9 @@ class OptionsHintTest(param: Param)
 
   @Test
   def testOptionsWithGlobalConfDisabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
+    util.tableEnv.getConfig.set(
       TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
-      false)
+      Boolean.box(false))
     expectedException.expect(isA(classOf[ValidationException]))
     expectedException.expectMessage(s"OPTIONS hint is allowed only when "
       + s"${TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key} is set to true")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
index b459371..8513973 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
@@ -52,8 +52,8 @@ class JoinDeriveNullFilterRuleTest extends TableTestBase {
         .build()
     )
 
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      JoinDeriveNullFilterRule.TABLE_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, 2000000)
+    util.tableEnv.getConfig.set(
+      JoinDeriveNullFilterRule.TABLE_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, Long.box(2000000))
     util.addTableSource("MyTable1",
       Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING, Types.INT, Types.LONG),
       Array("a1", "b1", "c1", "d1", "e1"),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index d809dc4..26d4324 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -33,8 +33,8 @@ class SplitAggregateRuleTest extends TableTestBase {
   private val util = streamTestUtil()
   util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
   util.buildStreamProgram(FlinkStreamProgram.PHYSICAL)
-  util.tableEnv.getConfig.getConfiguration.setBoolean(
-    OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+  util.tableEnv.getConfig.set(
+    OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
 
   @Test
   def testSingleDistinctAgg(): Unit = {
@@ -163,16 +163,16 @@ class SplitAggregateRuleTest extends TableTestBase {
 
   @Test
   def testBucketsConfiguration(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM, 100)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM, Integer.valueOf(100))
     val sqlQuery = "SELECT COUNT(DISTINCT c) FROM MyTable"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
   def testMultipleDistinctAggOnSameColumn(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sqlQuery =
       s"""
          |SELECT
@@ -189,8 +189,8 @@ class SplitAggregateRuleTest extends TableTestBase {
 
   @Test
   def testAggFilterClauseBothWithAvgAndCount(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sqlQuery =
       s"""
          |SELECT
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalHashAggRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalHashAggRuleTest.scala
index c0c1128..9d93773 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalHashAggRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalHashAggRuleTest.scala
@@ -51,9 +51,9 @@ class EnforceLocalHashAggRuleTest extends EnforceLocalAggRuleTestBase {
       .replaceBatchProgram(program).build()
     util.tableEnv.getConfig.setPlannerConfig(calciteConfig)
     // only enabled HashAgg
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRuleTest.scala
index e0eeca2..29520c5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRuleTest.scala
@@ -55,9 +55,9 @@ class EnforceLocalSortAggRuleTest extends EnforceLocalAggRuleTestBase {
       .replaceBatchProgram(program).build()
     util.tableEnv.getConfig.setPlannerConfig(calciteConfig)
     // only enabled SortAgg
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
index 1060fa8..fba9798 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRuleTest.scala
@@ -40,11 +40,11 @@ class RemoveRedundantLocalHashAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalHashAgg_ShuffleKeyFromJoin(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,SortAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -55,10 +55,10 @@ class RemoveRedundantLocalHashAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalHashAgg_ShuffleKeyFromRank(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |SELECT a, SUM(b) FROM (
@@ -72,10 +72,10 @@ class RemoveRedundantLocalHashAggRuleTest extends TableTestBase {
 
   @Test
   def testUsingLocalAggCallFilters(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED, Boolean.box(true))
     val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" +
       "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" +
       "FROM z GROUP BY d"
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index 15f4293..a7fbbee 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.table.api.{ExplainDetail, _}
+import org.apache.flink.table.api.ExplainDetail
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
@@ -162,7 +162,7 @@ class ChangelogModeInferenceTest extends TableTestBase {
   def testTwoLevelGroupByLocalGlobalOn(): Unit = {
       util.enableMiniBatch()
       util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-      util.tableEnv.getConfig.getConfiguration.setString(
+      util.tableEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.TWO_PHASE.toString)
     // two level unbounded groupBy
@@ -223,9 +223,9 @@ class ChangelogModeInferenceTest extends TableTestBase {
 
   @Test
   def testPropagateUpdateKindAmongRelNodeBlocks(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
+    util.tableEnv.getConfig.set(
       RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
-      true)
+      Boolean.box(true))
     util.addTable(
       """
         |create table sink1 (
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRuleTest.scala
index a3ec3d7..e1cdf31 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRuleTest.scala
@@ -83,8 +83,8 @@ class ExpandWindowTableFunctionTransposeRuleTest extends TableTestBase {
          |)
          |""".stripMargin)
 
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
index 414b044..1a39444 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
@@ -115,8 +115,8 @@ class DagOptimizationTest extends TableTestBase {
 
   @Test
   def testSingleSinkSplitOnUnion(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val sqlQuery = "SELECT SUM(a) AS total_sum FROM " +
       "(SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1)"
@@ -128,8 +128,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c")
     util.tableEnv.registerTable("table1", table1)
     val table2 = util.tableEnv.sqlQuery("SELECT SUM(sum_a) AS total_sum FROM table1")
@@ -151,8 +152,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10")
@@ -177,8 +178,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10")
@@ -231,8 +232,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks5(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     // test with non-deterministic udf
     util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
     val table1 = util.tableEnv.sqlQuery("SELECT random_udf(a) AS a, c FROM MyTable")
@@ -256,8 +258,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksWithUDTF(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addFunction("split", new TableFunc1)
     val sqlQuery1 =
       """
@@ -300,8 +302,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table = util.tableEnv.sqlQuery(
       "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1")
@@ -325,10 +327,11 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 =
@@ -367,8 +370,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 = "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"
@@ -402,8 +405,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery =
@@ -509,8 +512,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiLevelViews(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
     util.tableEnv.registerTable("TempTable1", table1)
@@ -552,8 +555,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testSharedUnionNode(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
     util.tableEnv.registerTable("TempTable1", table1)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
index a6ec4e8..027dfac 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
@@ -70,7 +70,7 @@ class DeduplicateTest extends TableTestBase {
 
   @Test
   def testLastRowWithWindowOnRowtime(): Unit = {
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(500))
     util.addTable(
       """
@@ -125,9 +125,9 @@ class DeduplicateTest extends TableTestBase {
 
   @Test
   def testMiniBatchInferFirstRowOnRowtime(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 3L)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
+    util.tableEnv.getConfig.set(
       TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val ddl =
       s"""
@@ -175,9 +175,9 @@ class DeduplicateTest extends TableTestBase {
 
   @Test
   def testMiniBatchInferLastRowOnRowtime(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 3L)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
+    util.tableEnv.getConfig.set(
       TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val ddl =
       s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index 0cf621c..988ac69 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -72,13 +72,13 @@ class MiniBatchIntervalInferTest extends TableTestBase {
          |""".stripMargin)
 
     // enable mini-batch
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
   }
 
   @Test
   def testMiniBatchOnly(): Unit = {
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b"
     util.verifyExecPlan(sql)
@@ -86,7 +86,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMiniBatchOnlyForDataStream(): Unit = {
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
         .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b"
     util.verifyExecPlan(sql)
@@ -94,7 +94,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testRedundantWatermarkDefinition(): Unit = {
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
         .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b"
     util.verifyExecPlan(sql)
@@ -103,7 +103,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testWindowWithEarlyFire(): Unit = {
     val tableConfig = util.tableEnv.getConfig
-    tableConfig.getConfiguration
+    tableConfig
         .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     withEarlyFireDelay(tableConfig, Time.milliseconds(500))
     val sql =
@@ -124,7 +124,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testWindowCascade(): Unit = {
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
         .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(3))
     val sql =
       """
@@ -144,7 +144,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testIntervalJoinWithMiniBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     val sql =
@@ -165,7 +165,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testRowtimeRowsOverWithMiniBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     val sql =
@@ -187,7 +187,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     util.addTableWithWatermark("Orders", util.tableEnv.from("MyDataStream1"), "rowtime", 0)
     util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyDataStream2"), "rowtime", 0)
 
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     util.addTemporarySystemFunction(
@@ -212,7 +212,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMultiOperatorNeedsWatermark1(): Unit = {
     // infer result: miniBatchInterval=[Rowtime, 0ms]
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     val sql =
@@ -236,7 +236,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark2(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(6))
 
     val sql =
@@ -271,7 +271,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark3(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(6))
 
     val sql =
@@ -326,10 +326,9 @@ class MiniBatchIntervalInferTest extends TableTestBase {
          |)
          |""".stripMargin)
 
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(500))
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 300L)
+    util.tableEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(300L))
 
     val table1 = util.tableEnv.sqlQuery(
       """
@@ -393,7 +392,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMiniBatchOnDataStreamWithRowTime(): Unit = {
     util.addDataStream[(Long, Int, String)]("T1", 'long, 'int, 'str, 'rowtime.rowtime)
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val sql =
       """
@@ -409,7 +408,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testOverWindowMiniBatchOnDataStreamWithRowTime(): Unit = {
     util.addDataStream[(Long, Int, String)]("T1", 'long, 'int, 'str, 'rowtime.rowtime)
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(3))
     val sql =
       """
@@ -428,7 +427,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   private def withEarlyFireDelay(tableConfig: TableConfig, interval: Time): Unit = {
     val intervalInMillis = interval.toMilliseconds
-    val earlyFireDelay: Duration = tableConfig.getConfiguration
+    val earlyFireDelay: Duration = tableConfig
       .getOptional(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY)
       .orElse(null)
     if (earlyFireDelay != null && (earlyFireDelay.toMillis != intervalInMillis)) { //
@@ -436,8 +435,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
       throw new RuntimeException("Currently not support different earlyFireInterval configs in " +
         "one job")
     }
-    tableConfig.getConfiguration.setBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
-    tableConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
+    tableConfig.set(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
+    tableConfig.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/ModifiedMonotonicityTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/ModifiedMonotonicityTest.scala
index 4cb7b66..c4b14ac 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/ModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/ModifiedMonotonicityTest.scala
@@ -67,9 +67,9 @@ class ModifiedMonotonicityTest extends TableTestBase {
 
   @Test
   def testMaxWithRetractOptimizeWithLocalGlobal(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(100))
     val query = "SELECT a1, max(a3) from (SELECT a1, a2, max(a3) as a3 FROM A GROUP BY a1, a2) " +
       "group by a1"
@@ -78,9 +78,9 @@ class ModifiedMonotonicityTest extends TableTestBase {
 
   @Test
   def testMinWithRetractOptimizeWithLocalGlobal(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(100))
     val query = "SELECT min(a3) from (SELECT a1, a2, min(a3) as a3 FROM A GROUP BY a1, a2)"
     util.verifyRelPlan(query, ExplainDetail.CHANGELOG_MODE)
@@ -88,9 +88,9 @@ class ModifiedMonotonicityTest extends TableTestBase {
 
   @Test
   def testMinCanNotOptimizeWithLocalGlobal(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(100))
     val query =
       "SELECT a1, MIN(a3) FROM (SELECT a1, a2, MAX(a3) AS a3 FROM A GROUP BY a1, a2) t GROUP BY a1"
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 3963652..18d292f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -840,8 +840,9 @@ class RankTest extends TableTestBase {
 
   @Test
   def testUpdatableRankAfterIntermediateScan(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     util.tableEnv.executeSql(
       """
         |CREATE VIEW v1 AS
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala
index 12a851e..c6b0922 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.scala
@@ -34,18 +34,18 @@ class SubplanReuseTest extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
     util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
   }
 
   @Test
   def testDisableSubplanReuse(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, Boolean.box(false))
     val sqlQuery =
       """
         |WITH r AS (
@@ -58,8 +58,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     // can not reuse because of different row-type
     val sqlQuery =
       """
@@ -72,8 +72,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testEnableReuseTableSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
     val sqlQuery =
       """
         |WITH t AS (SELECT x.a AS a, x.b AS b, y.d AS d, y.e AS e FROM x, y WHERE x.a = y.d)
@@ -84,8 +84,8 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testDisableReuseTableSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     val sqlQuery =
       """
         |WITH t AS (SELECT * FROM x, y WHERE x.a = y.d)
@@ -184,7 +184,7 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testSubplanReuseOnLimit(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,SortMergeJoin")
     val sqlQuery =
       """
@@ -303,15 +303,15 @@ class SubplanReuseTest extends TableTestBase {
 
   @Test
   def testEnableReuseTableSourceOnNewSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(true))
     testReuseOnNewSource()
   }
 
   @Test
   def testDisableReuseTableSourceOnNewSource(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, Boolean.box(false))
     testReuseOnNewSource()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 598a2dd..63051cb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -288,8 +288,8 @@ class TableScanTest extends TableTestBase {
 
   @Test
   def testJoinOnChangelogSourceWithEventsDuplicate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, Boolean.box(true))
     verifyJoinOnSource("I,UB,UA")
   }
 
@@ -356,8 +356,8 @@ class TableScanTest extends TableTestBase {
 
   @Test
   def testChangelogSourceWithEventsDuplicate(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, Boolean.box(true))
     util.addTable(
       """
         |CREATE TABLE src (
@@ -668,8 +668,8 @@ class TableScanTest extends TableTestBase {
         |  'changelog-mode' = 'I,UB,UA,D'
         |)
       """.stripMargin)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, Boolean.box(true))
 
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Configuration 'table.exec.source.cdc-events-duplicate' is enabled " +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index d5ef3f3..f760fb5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -504,7 +504,7 @@ class TableSinkTest extends TableTestBase {
 
   @Test def testAppendStreamToSinkWithPkNoKeyBy(): Unit = {
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.NONE)
     tEnv.executeSql(
       """
@@ -535,7 +535,8 @@ class TableSinkTest extends TableTestBase {
   @Test def testAppendStreamToSinkWithPkForceKeyBy(): Unit = {
     util.getStreamEnv.setParallelism(4)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
     tEnv.executeSql(
       """
@@ -566,7 +567,8 @@ class TableSinkTest extends TableTestBase {
   @Test def testSingleParallelismAppendStreamToSinkWithPkForceKeyBy(): Unit = {
     util.getStreamEnv.setParallelism(1)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
     tEnv.executeSql(
       """
@@ -597,7 +599,8 @@ class TableSinkTest extends TableTestBase {
   @Test def testAppendStreamToSinkWithoutPkForceKeyBy(): Unit = {
     util.getStreamEnv.setParallelism(4)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
     tEnv.executeSql(
       """
@@ -627,7 +630,8 @@ class TableSinkTest extends TableTestBase {
   @Test def testAppendStreamToSinkWithoutPkForceKeyBySingleParallelism(): Unit = {
     util.getStreamEnv.setParallelism(4)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
     tEnv.executeSql(
       """
@@ -656,7 +660,8 @@ class TableSinkTest extends TableTestBase {
   @Test def testChangelogStreamToSinkWithPkDifferentParallelism(): Unit = {
     util.getStreamEnv.setParallelism(1)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.AUTO)
     tEnv.executeSql(
       """
@@ -685,10 +690,12 @@ class TableSinkTest extends TableTestBase {
     util.verifyExplain(stmtSet, ExplainDetail.JSON_EXECUTION_PLAN)
   }
 
-  @Test def testChangelogStreamToSinkWithPkSingleParallelism(): Unit = {
+  @Test
+  def testChangelogStreamToSinkWithPkSingleParallelism(): Unit = {
     util.getStreamEnv.setParallelism(4)
     val tEnv = util.tableEnv
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
     tEnv.executeSql(
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index 883415d..48274d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -64,18 +64,18 @@ class AggregateTest extends TableTestBase {
 
   @Test
   def testAggWithMiniBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     util.verifyExecPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c)  FROM MyTable GROUP BY b")
   }
 
   @Test
   def testAggAfterUnionWithMiniBatch(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
     val query =
       """
@@ -97,9 +97,9 @@ class AggregateTest extends TableTestBase {
   @Test
   def testLocalGlobalAggAfterUnion(): Unit = {
     // enable local global optimize
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     val sql =
@@ -130,9 +130,9 @@ class AggregateTest extends TableTestBase {
 
   @Test
   def testAggWithFilterClauseWithLocalGlobal(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
+    util.tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
 
     val sql =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index bc661d1..795d268 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -44,13 +44,14 @@ class DistinctAggregateTest(
   def before(): Unit = {
     util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     util.enableMiniBatch()
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, aggPhaseEnforcer.toString)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, splitDistinctAggEnabled)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED,
+      Boolean.box(splitDistinctAggEnabled))
     // disable incremental agg
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(false))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
index 90739d5..d0a039f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
@@ -433,8 +433,8 @@ class GroupWindowTest extends TableTestBase {
 
   @Test
   def testWindowAggregateWithLateFire(): Unit = {
-    util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
-    util.conf.getConfiguration.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
     util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     val sql =
       """
@@ -447,9 +447,9 @@ class GroupWindowTest extends TableTestBase {
 
   @Test
   def testWindowAggregateWithAllowLateness(): Unit = {
-    util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
-    util.conf.getConfiguration.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
-    util.conf.getConfiguration.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofHours(1))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
+    util.conf.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofHours(1))
     val sql =
       """
         |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
@@ -461,9 +461,9 @@ class GroupWindowTest extends TableTestBase {
 
   @Test
   def testWindowAggregateWithInvalidAllowLateness(): Unit = {
-    util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
-    util.conf.getConfiguration.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
-    util.conf.getConfiguration.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(1))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
+    util.conf.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
+    util.conf.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(1))
     val sql =
       """
         |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
index 75ebee1..8863204 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.scala
@@ -37,8 +37,8 @@ class IncrementalAggregateTest(
   override def before(): Unit = {
     super.before()
     // enable incremental agg
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED, Boolean.box(true))
   }
 }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
index 93c157d..9675761 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.scala
@@ -35,7 +35,7 @@ class TwoStageAggregateTest extends TableTestBase {
   def before(): Unit = {
     util.enableMiniBatch()
     util.tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       AggregatePhaseStrategy.TWO_PHASE.toString)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index d1b9a62..5eddd71 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -69,7 +69,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
          |""".stripMargin)
 
     // set agg-phase strategy
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       aggPhaseEnforcer.toString)
   }
@@ -284,8 +284,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testTumble_DistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -304,8 +304,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testTumble_DistinctOnWindowColumns(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     // window_time is used in agg arg, thus we shouldn't merge WindowTVF into WindowAggregate.
     // actually, after expanded, there's HASH_CODE(window_time),
     // and thus we shouldn't transpose WindowTVF and Expand too.
@@ -328,8 +328,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
   def testTumble_DoNotSplitProcessingTimeWindow(): Unit = {
     assumeTrue(isTwoPhase)
     // the processing-time window aggregate with distinct shouldn't be split into two-level agg
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -426,8 +426,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testCumulate_DistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -488,8 +488,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testHop_DistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -792,8 +792,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testTumble_GroupingSetsDistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -855,8 +855,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testCantMergeWindowTVF_GroupingSetsDistinctOnWindowColumns(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     // window_time is used in agg arg, thus we shouldn't merge WindowTVF into WindowAggregate.
     // actually, after expanded, there's HASH_CODE(window_time),
     // and thus we shouldn't transpose WindowTVF and Expand too.
@@ -891,8 +891,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testHop_GroupingSets_DistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
@@ -954,8 +954,8 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
 
   @Test
   def testCumulate_GroupingSets_DistinctSplitEnabled(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     val sql =
       """
         |SELECT
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithMiniBatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithMiniBatchTestBase.scala
index 9324b92..583c4cb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithMiniBatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithMiniBatchTestBase.scala
@@ -39,9 +39,9 @@ abstract class StreamingWithMiniBatchTestBase(
     val tableConfig = tEnv.getConfig
     miniBatch match {
       case MiniBatchOn =>
-        tableConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true)
-        tableConfig.getConfiguration.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
-        tableConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 3L)
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(3))
       case MiniBatchOff =>
         tableConfig.getConfiguration.removeConfig(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY)
     }