You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 14:51:34 UTC

[flink] 02/02: [FLINK-26287][table] Update some test classes for the new TableConfig.set

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66af14beb296797a3e35a2d338e86d846ac130df
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Feb 22 17:30:54 2022 +0100

    [FLINK-26287][table] Update some test classes for the new TableConfig.set
    
    This closes #18895.
---
 .../file/table/FileSystemTableSinkTest.java        |  8 +--
 .../apache/flink/table/api/CompiledPlanITCase.java |  2 +-
 .../flink/table/planner/codegen/CodeSplitTest.java | 16 ++---
 .../plan/batch/sql/ForwardHashExchangeTest.java    | 70 +++++++---------------
 .../nodes/exec/common/CommonExecSinkITCase.java    | 18 ++----
 .../nodes/exec/operator/BatchOperatorNameTest.java | 10 +---
 .../nodes/exec/operator/OperatorNameTestBase.java  |  1 -
 .../exec/operator/StreamOperatorNameTest.java      |  2 -
 .../exec/serde/ContextResolvedTableSerdeTest.java  | 13 ++--
 .../exec/stream/ChangelogSourceJsonPlanTest.java   |  4 +-
 .../plan/nodes/exec/stream/ExpandJsonPlanTest.java |  9 +--
 .../exec/stream/GroupAggregateJsonPlanTest.java    | 13 +---
 .../stream/IncrementalAggregateJsonPlanTest.java   | 21 ++-----
 .../exec/stream/WindowAggregateJsonPlanTest.java   |  4 +-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  |  5 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.java   | 25 +++-----
 .../batch/sql/ForwardHashExchangeITCase.java       |  9 +--
 .../sql/agg/LocalAggregatePushDownITCase.java      | 14 ++---
 .../jsonplan/DeduplicationJsonPlanITCase.java      |  4 +-
 .../stream/jsonplan/ExpandJsonPlanITCase.java      |  9 +--
 .../jsonplan/GroupAggregateJsonPlanITCase.java     | 13 +---
 .../IncrementalAggregateJsonPlanITCase.java        | 21 ++-----
 .../stream/jsonplan/WindowAggregateJsonITCase.java |  7 +--
 .../flink/table/api/TableEnvironmentITCase.scala   |  8 +--
 .../apache/flink/table/api/batch/ExplainTest.scala |  8 +--
 .../flink/table/api/stream/ExplainTest.scala       | 14 +++--
 .../table/planner/catalog/CatalogTableITCase.scala |  3 +-
 .../table/planner/catalog/CatalogViewITCase.scala  |  3 +-
 .../expressions/NonDeterministicTests.scala        | 11 ++--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../plan/batch/sql/DagOptimizationTest.scala       | 53 ++++++++--------
 .../RemoveRedundantLocalSortAggRuleTest.scala      | 10 ++--
 .../table/PythonGroupWindowAggregateTest.scala     |  4 +-
 .../plan/stream/table/TwoStageAggregateTest.scala  | 14 ++---
 .../GroupWindowTableAggregateValidationTest.scala  |  4 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |  4 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  6 +-
 .../runtime/batch/sql/CodeSplitITCase.scala        |  6 +-
 .../planner/runtime/batch/sql/DecimalITCase.scala  | 28 ++++-----
 .../planner/runtime/batch/sql/MiscITCase.scala     |  2 +-
 .../runtime/batch/sql/MultipleInputITCase.scala    | 10 ++--
 .../batch/sql/PartitionableSinkITCase.scala        |  4 +-
 .../runtime/batch/sql/SortLimitITCase.scala        |  3 +-
 .../runtime/batch/sql/TableSinkITCase.scala        |  6 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  5 +-
 .../planner/runtime/batch/sql/UnionITCase.scala    | 10 ++--
 .../sql/agg/AggregateJoinTransposeITCase.scala     |  2 +-
 .../sql/agg/AggregateReduceGroupingITCase.scala    | 17 +++---
 .../runtime/batch/sql/agg/HashAggITCase.scala      |  2 +-
 .../sql/agg/HashDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/agg/SortAggITCase.scala      |  6 +-
 .../sql/agg/SortDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/join/InnerJoinITCase.scala   |  5 +-
 .../runtime/batch/sql/join/JoinITCaseHelper.scala  | 13 ++--
 .../runtime/batch/table/DecimalITCase.scala        | 28 ++++-----
 .../harness/GroupAggregateHarnessTest.scala        |  8 +--
 .../harness/WindowAggregateHarnessTest.scala       |  3 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  2 +-
 .../runtime/stream/sql/ChangelogSourceITCase.scala |  4 +-
 .../runtime/stream/sql/DeduplicateITCase.scala     | 10 ++--
 .../runtime/stream/sql/GroupWindowITCase.scala     | 15 ++---
 .../planner/runtime/stream/sql/SortITCase.scala    | 25 ++++----
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  8 +--
 .../runtime/stream/sql/TemporalJoinITCase.scala    | 12 ++--
 .../runtime/stream/sql/WindowAggregateITCase.scala |  2 +-
 .../stream/sql/WindowDistinctAggregateITCase.scala |  5 +-
 .../runtime/stream/table/TableSinkITCase.scala     |  3 +-
 .../planner/runtime/utils/BatchTestBase.scala      |  6 +-
 .../runtime/utils/StreamingWithAggTestBase.scala   |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  | 11 ++--
 70 files changed, 286 insertions(+), 425 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
index 9c9d28b..d329688 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
@@ -62,9 +62,7 @@ public class FileSystemTableSinkTest {
         final int parallelism = 5;
         final TableEnvironment tEnv =
                 TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
 
         final String testSourceTableName = "test_source_table";
         tEnv.executeSql(buildSourceTableSql(testSourceTableName, false));
@@ -98,9 +96,7 @@ public class FileSystemTableSinkTest {
     public void testFileSystemTableSinkWithParallelismInBatch() {
         final int parallelism = 5;
         final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
 
         final String testSourceTableName = "test_source_table";
         final String testSinkTableName = "test_sink_table";
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index c9c8b31..b43b6a6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -218,7 +218,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 
     @Test
     public void testCompilePlanOverwrite() throws Exception {
-        tableEnv.getConfig().getConfiguration().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
+        tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
 
         Path planPath =
                 Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
index 1a1d7de..2d166c6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
@@ -220,21 +220,13 @@ public class CodeSplitTest {
 
     private void runTest(Consumer<TableConfig> consumer) {
         TableConfig splitTableConfig = new TableConfig();
-        splitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000);
-        splitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000);
+        splitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000);
+        splitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000);
         consumer.accept(splitTableConfig);
 
         TableConfig noSplitTableConfig = new TableConfig();
-        noSplitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Integer.MAX_VALUE);
-        noSplitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Integer.MAX_VALUE);
+        noSplitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Integer.MAX_VALUE);
+        noSplitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Integer.MAX_VALUE);
         PrintStream originalStdOut = System.out;
         try {
             // redirect stdout to a null output stream to silence compile error in CompileUtils
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index 3297879..fd68753 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -75,8 +75,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnHashAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 " SELECT\n"
                         + "   SUM(b) sum_b,\n"
@@ -91,8 +90,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnHashAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan("SELECT b, RANK() OVER (ORDER BY b) FROM (SELECT SUM(b) AS b FROM T)");
     }
 
@@ -100,8 +98,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.verifyExecPlan(
                 " SELECT\n"
                         + "   SUM(b) sum_b,\n"
@@ -116,8 +113,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnSortAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.verifyExecPlan("SELECT b, RANK() OVER (ORDER BY b) FROM (SELECT SUM(b) AS b FROM T)");
     }
 
@@ -125,8 +121,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashAggOnHashJoinWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "SortMergeJoin,NestedLoopJoin,SortAgg");
         util.verifyExecPlan(
@@ -138,8 +133,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortAggOnSortMergeJoinWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin,NestedLoopJoin,HashAgg");
         util.verifyExecPlan(
@@ -151,12 +145,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashAggOnNestedLoopJoinWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         // TODO the shuffle between join and agg can be removed
         util.verifyExecPlan(
                 "WITH r AS (SELECT * FROM T1 FULL OUTER JOIN T2 ON true)\n"
@@ -167,12 +159,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortAggOnNestedLoopJoinWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         // TODO the shuffle between join and agg can be removed
         util.verifyExecPlan(
                 "WITH r AS (SELECT * FROM T1 FULL OUTER JOIN T2 ON true)\n"
@@ -183,8 +173,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnHashAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (\n"
@@ -197,8 +186,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnHashAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT b, RANK() OVER(ORDER BY b) rk FROM (\n"
@@ -211,8 +199,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (\n"
@@ -225,8 +212,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnSortAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT b, RANK() OVER(ORDER BY b) rk FROM (\n"
@@ -239,14 +225,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashJoinWithMultipleInputDisabled() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
-                        "SortMergeJoin,NestedLoopJoin");
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
+                        "SortMergeJoin,NestedLoopJoin")
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
         util.verifyExecPlan(
                 "SELECT * FROM\n"
                         + "  (SELECT a FROM T1 JOIN T ON a = a1) t1\n"
@@ -259,14 +241,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortJoinWithMultipleInputDisabled() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
-                        "HashJoin,NestedLoopJoin");
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
+                        "HashJoin,NestedLoopJoin")
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
         util.verifyExecPlan(
                 "SELECT * FROM\n"
                         + "  (SELECT a FROM T1 JOIN T ON a = a1) t1\n"
@@ -279,12 +257,8 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testMultipleInputs() {
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
-        util.getTableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false)
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "NestedLoopJoin,SortMergeJoin,SortAgg");
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 56d90b3..44f36a2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -235,8 +235,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // accordingly, based on their type length
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
 
@@ -255,8 +254,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
         }
@@ -315,8 +313,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // accordingly, based on their type length
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
 
@@ -359,8 +356,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
         }
@@ -415,8 +411,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // Change config option to "drop", to drop the columns instead of throwing errors
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(),
                             ExecutionConfigOptions.NotNullEnforcer.DROP.name());
 
@@ -431,8 +426,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
             assertThat(results.get().get(1).getInt(2)).isEqualTo(33);
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(),
                             ExecutionConfigOptions.NotNullEnforcer.ERROR.name());
         }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
index 1d55870..a336a42 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
@@ -65,9 +65,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     /** Verify Sort, SortAggregate. */
     @Test
     public void testSortAggregate() {
-        tEnv.getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         createTestSource();
         verifyQuery("SELECT a, " + "count(distinct b) as b " + "FROM MyTable GROUP BY a");
     }
@@ -95,8 +93,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testNestedLoopJoin() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin, SortMergeJoin");
         testJoinInternal();
@@ -106,8 +103,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testSortMergeJoin() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin, NestedLoopJoin");
         testJoinInternal();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
index 82f8519..82c3fbf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
@@ -53,7 +53,6 @@ public abstract class OperatorNameTestBase extends TableTestBase {
         util.getStreamEnv().setParallelism(2);
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED,
                         isNameSimplifyEnabled);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
index fb4fad5..a6c8eac 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
@@ -146,7 +146,6 @@ public class StreamOperatorNameTest extends OperatorNameTestBase {
     public void testIncrementalAggregate() {
         util.enableMiniBatch();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestSource();
         verifyQuery("SELECT a, " + "count(distinct b) as b " + "FROM MyTable GROUP BY a");
@@ -274,7 +273,6 @@ public class StreamOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testWindowAggregate() {
         tEnv.getConfig()
-                .getConfiguration()
                 .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         createSourceWithTimeAttribute();
         verifyQuery(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
index ba059dd..414c954 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
@@ -811,13 +811,12 @@ public class ContextResolvedTableSerdeTest {
             TableConfigOptions.CatalogPlanCompilation planCompilationOption,
             TableConfigOptions.CatalogPlanRestore planRestoreOption) {
         // Create table options
-        TableConfig tableConfig = TableConfig.getDefault();
-        tableConfig
-                .getConfiguration()
-                .set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, planRestoreOption);
-        tableConfig
-                .getConfiguration()
-                .set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, planCompilationOption);
+        TableConfig tableConfig =
+                TableConfig.getDefault()
+                        .set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, planRestoreOption)
+                        .set(
+                                TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS,
+                                planCompilationOption);
 
         return JsonSerdeTestUtil.configuredSerdeContext(CATALOG_MANAGER, tableConfig);
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
index cc9791a..1ec1567 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
@@ -37,9 +37,7 @@ public class ChangelogSourceJsonPlanTest extends TableTestBase {
     public void setup() {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
     }
 
     @Test
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
index 33c3293..b55cf03 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
@@ -40,15 +40,10 @@ public class ExpandJsonPlanTest extends TableTestBase {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.ONE_PHASE.name());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.ONE_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), false);
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
index 86238e0..27b7296 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
@@ -57,20 +57,13 @@ public class GroupAggregateJsonPlanTest extends TableTestBase {
         tEnv = util.getTableEnv();
         if (isMiniBatchEnabled) {
             tEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-            tEnv.getConfig()
-                    .getConfiguration()
+                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                     .set(
                             ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10));
-            tEnv.getConfig()
-                    .getConfiguration()
+                            Duration.ofSeconds(10))
                     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
         } else {
-            tEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
+            tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
         }
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
index 5eecb92..cc210a6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
@@ -43,26 +43,15 @@ public class IncrementalAggregateJsonPlanTest extends TableTestBase {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10));
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        tEnv.getConfig()
-                .getConfiguration()
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
index 1ac216b..7f6672f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
@@ -275,9 +275,7 @@ public class WindowAggregateJsonPlanTest extends TableTestBase {
     @Test
     public void testDistinctSplitEnabled() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         String sinkTableDdl =
                 "CREATE TABLE MySink (\n"
                         + "  a bigint,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index e47a110..58215cb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -237,10 +237,7 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
 
     @Test
     public void testWatermarkWithIdleSource() {
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(1000));
+        util.tableEnv().getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(1000));
         String ddl =
                 "CREATE TABLE MyTable("
                         + "  a INT,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
index a3f91ab..51aee49 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
@@ -39,11 +39,8 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
     @Before
     public void setup() {
         TableConfig tableConfig = util.tableEnv().getConfig();
-        tableConfig
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
-                        true);
+        tableConfig.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true);
         String ddl =
                 "CREATE TABLE inventory (\n"
                         + "  id BIGINT,\n"
@@ -143,8 +140,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setBoolean(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
                         true);
     }
@@ -166,8 +162,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable sort agg
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         util.verifyRelPlan(
                 "SELECT\n"
@@ -181,8 +176,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
     }
 
     @Test
@@ -190,8 +184,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable sort agg
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         util.verifyRelPlan(
                 "SELECT\n"
@@ -204,8 +197,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
     }
 
     @Test
@@ -263,8 +255,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable two-phase aggregate, otherwise there is no local aggregate
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         util.verifyRelPlan(
                 "SELECT\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index cda6090..d7737b6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -40,7 +40,6 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
         env().getConfig().setDynamicGraph(true);
         env().disableOperatorChaining();
         tEnv().getConfig()
-                .getConfiguration()
                 .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
 
         String testDataId = TestValuesTableFactory.registerData(TestData.data3());
@@ -61,9 +60,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
 
     @Test
     public void testOverAggWithHashAgg() {
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         checkResult(
                 "SELECT\n"
                         + "   b,\n"
@@ -85,9 +82,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
 
     @Test
     public void testOverAggWithSortAgg() {
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         checkResult(
                 "SELECT\n"
                         + "   b,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
index b6522c0..778bb09 100755
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
@@ -134,8 +134,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     public void testDisablePushDownLocalAgg() {
         // disable push down local agg
         tEnv().getConfig()
-                .getConfiguration()
-                .setBoolean(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
                         false);
 
@@ -170,9 +169,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     @Test
     public void testPushDownLocalSortAggWithoutSort() {
         // enable sort agg
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         checkResult(
                 "SELECT\n"
@@ -188,9 +185,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     @Test
     public void testPushDownLocalSortAggWithSort() {
         // enable sort agg
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         checkResult(
                 "SELECT\n"
@@ -296,8 +291,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     public void testPushDownLocalAggWithoutAuxGrouping() {
         // enable two-phase aggregate, otherwise there is no local aggregate
         tEnv().getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         checkResult(
                 "SELECT\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
index d9673f9..68bf98a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
@@ -68,9 +68,7 @@ public class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
                                 + "    ROW_NUMBER() OVER (PARTITION BY product ORDER BY event_time ASC) AS row_num\n"
                                 + "  FROM MyTable)\n"
                                 + "WHERE row_num = 1 \n");
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
         checkTransformationUids(compiledPlan);
         tableEnv.executePlan(compiledPlan).await();
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
index 46188c8..4ebf82e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
@@ -37,15 +37,10 @@ public class ExpandJsonPlanITCase extends JsonPlanTestBase {
     @Test
     public void testExpand() throws Exception {
         tableEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.ONE_PHASE.name());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.ONE_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), false);
 
         createTestValuesSourceTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
index 924735a..3450478 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
@@ -53,20 +53,13 @@ public class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
         super.setup();
         if (isMiniBatchEnabled) {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-            tableEnv.getConfig()
-                    .getConfiguration()
+                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                     .set(
                             ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10));
-            tableEnv.getConfig()
-                    .getConfiguration()
+                            Duration.ofSeconds(10))
                     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
         } else {
-            tableEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
+            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
index d6b73a9..ea76839 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -43,26 +43,15 @@ public class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
     public void setup() throws Exception {
         super.setup();
         tableEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10));
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index f9b3f26..6e7e848 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -72,8 +72,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase {
                     }
                 });
         tableEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
                         aggPhase.toString());
     }
@@ -181,9 +180,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase {
     @Test
     public void testDistinctSplitEnabled() throws Exception {
         tableEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestValuesSinkTable(
                 "MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT");
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 9d5cb95..25048d8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -86,7 +86,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
 
   @Test
   def testSetPlannerType: Unit = {
-    tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
+    tEnv.getConfig.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
 
     TestTableSourceSinks.createCsvTemporarySinkTable(
       tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink1")
@@ -106,9 +106,9 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
   @Test
   def testSetExecutionMode(): Unit = {
     if (isStreaming) {
-      tEnv.getConfig.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+      tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
     } else {
-      tEnv.getConfig.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
+      tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE,
         RuntimeExecutionMode.STREAMING)
     }
 
@@ -377,7 +377,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
 
   @Test
   def testTableDMLSync(): Unit = {
-    tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_DML_SYNC, Boolean.box(true))
+    tEnv.getConfig.set(TableConfigOptions.TABLE_DML_SYNC, Boolean.box(true))
     val sink1Path = _tempFolder.newFolder().toString
     tEnv.executeSql(
       s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index 8314f67..f5ecaf2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -49,8 +49,8 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4))
   }
 
   @Test
@@ -76,7 +76,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   @Test
   def testExplainWithJoin(): Unit = {
     // TODO support other join operators when them are supported
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     util.verifyExplain("SELECT a, b, c, e, f FROM MyTable1, MyTable2 WHERE a = d", extraDetails: _*)
   }
@@ -119,7 +119,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Test
   def testExplainMultipleInput(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index a66db23..88197ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -52,8 +52,8 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4))
   }
 
   @Test
@@ -128,10 +128,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
       "T2", 'id2, 'cnt, 'name, 'goods, 'rowtime.rowtime)
     util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0)
     util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(3))
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig
+      .set(
+        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+        Duration.ofSeconds(3))
     val table = util.tableEnv.sqlQuery(
       """
         |SELECT id1, T3.rowtime AS ts, text
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 25ed7ca..049931d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -62,8 +62,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   @Before
   def before(): Unit = {
     tableEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     TestCollectionTableFactory.reset()
 
     val func = new CatalogFunctionImpl(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
index 5bfd016..634fac1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
@@ -58,8 +58,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   @Before
   def before(): Unit = {
     tableEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     TestCollectionTableFactory.reset()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
index b76822d..b779f72 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
@@ -80,11 +80,12 @@ class NonDeterministicTests extends ExpressionTestBase {
   def testTemporalFunctionsInBatchMode(): Unit = {
     val zoneId = ZoneId.of("Asia/Shanghai")
     config.setLocalTimeZone(zoneId)
-    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+    config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
 
-    config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, 1123L)
-    config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
-      1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L))
+    config.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, Long.box(1123L))
+    config.set(
+      InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
+      Long.box(1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L)))
 
     val temporalFunctions = getCodeGenFunctions(List(
       "CURRENT_DATE",
@@ -109,7 +110,7 @@ class NonDeterministicTests extends ExpressionTestBase {
 
   @Test
   def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
-    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+    config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
     val temporalFunctions = getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
 
     val round1 = evaluateFunctionResult(temporalFunctions)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 3f9b558..07dff77 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -100,7 +100,7 @@ abstract class ExpressionTestBase {
 
   @Before
   def prepare(): Unit = {
-    config.getConfiguration.set(
+    config.set(
       ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
       ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
     )
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index b6fc21e..46e824a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -129,8 +129,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c")
     util.tableEnv.registerTable("table1", table1)
     val table2 = util.tableEnv.sqlQuery("SELECT SUM(sum_a) AS total_sum FROM table1")
@@ -150,8 +151,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10")
@@ -174,8 +175,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10")
@@ -198,8 +199,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10")
     util.tableEnv.registerTable("table1", table1)
@@ -227,8 +229,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks5(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     // test with non-deterministic udf
     util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
     val table1 = util.tableEnv.sqlQuery("SELECT random_udf(a) AS a, c FROM MyTable")
@@ -250,8 +253,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiLevelViews(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
     util.tableEnv.registerTable("TempTable1", table1)
@@ -290,8 +293,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksWithUDTF(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addFunction("split", new TableFunc1)
     val sqlQuery1 =
       """
@@ -332,8 +335,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table = util.tableEnv.sqlQuery(
       "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1")
@@ -355,10 +358,12 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 =
@@ -394,8 +399,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 = "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"
@@ -426,8 +431,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
index 8e3e999..5420ae9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
@@ -40,11 +40,11 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalSortAggWithSort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,HashAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -55,7 +55,7 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalSortAggWithoutSort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin,HashAgg")
     val sqlQuery =
       """
@@ -67,7 +67,7 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testUsingLocalAggCallFilters(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" +
       "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
index 15bd134..99d6431 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
@@ -182,8 +182,8 @@ class PythonGroupWindowAggregateTest extends TableTestBase {
   def testEmitStrategyNotSupported(): Unit = {
     val util = streamTestUtil()
     val tableConf = util.getTableEnv.getConfig
-    tableConf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, true)
-    tableConf.getConfiguration.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
 
     val sourceTable = util.addTableSource[(Int, Long, Int, Long)](
       "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
index 7d34a64..92cbea6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
@@ -36,15 +36,13 @@ class TwoStageAggregateTest extends TableTestBase {
     util = streamTestUtil()
     util.tableEnv.getConfig
       .setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
-    util.tableEnv.getConfig.getConfiguration
-        .setBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration
-      .setLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 3)
-    util.tableEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-      AggregatePhaseStrategy.TWO_PHASE.toString)
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3))
+      .set(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+        AggregatePhaseStrategy.TWO_PHASE.toString)
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
index 9a7894c..9708cda 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
@@ -72,8 +72,8 @@ class GroupWindowTableAggregateValidationTest extends TableTestBase {
     val table = util.addTableSource[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
 
     val tableConf = util.getTableEnv.getConfig
-    tableConf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, true)
-    tableConf.getConfiguration.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
 
     val result = table
       .window(Tumble over 2.hours on 'proctime as 'w)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index bded741..3f1743d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -432,8 +432,8 @@ trait FileSystemITCaseBase {
 
   @Test
   def testLimitPushDown(): Unit = {
-    tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     tableEnv.executeSql("insert into nonPartitionedTable select x, y, a, b from originalT").await()
 
     check(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 9ba2247..e591f10 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1333,10 +1333,8 @@ class CalcITCase extends BatchTestBase {
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
-    tableConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
-    tableConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
+    tableConfig.set(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     checkResult(
       "select * from BinaryT order by c",
       nullData3.sortBy((x : Row) =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index caad536..3a3534c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -136,10 +136,8 @@ class CodeSplitITCase extends BatchTestBase {
   }
 
   private[flink] def runTest(sql: String, results: Seq[Row]): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(
-      TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000)
-    tEnv.getConfig.getConfiguration.setInteger(
-      TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000)
+    tEnv.getConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Int.box(4000))
+    tEnv.getConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Int.box(10000))
     checkResult(sql.mkString, results)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
index c1c49be..459f0aa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
@@ -248,8 +248,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin1(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -261,8 +261,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin2(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -274,8 +274,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin3(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -288,8 +288,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin4(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -301,8 +301,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin5(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -314,8 +314,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin6(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -327,8 +327,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin7(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
       s1r(d"1", d"1", 1, 1.0),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
index 9cbca51..16f29d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
@@ -415,7 +415,7 @@ class MiscITCase extends BatchTestBase {
 
   @Test
   def testOrderByAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     env.setParallelism(1)
     checkQuery(
       Seq((1, 10), (1, 20), (10, 1), (10, 2)),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
index bc115d5..99e1985 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
@@ -74,7 +74,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode) extends BatchTestBase {
       "a, b, c, nt",
       MultipleInputITCase.nullables)
 
-    tEnv.getConfig.getConfiguration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
+    tEnv.getConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
   }
 
   @Test
@@ -206,11 +206,11 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode) extends BatchTestBase {
   }
 
   def checkMultipleInputResult(sql: String): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(false))
     val expected = executeQuery(sql)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, true)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(true))
     checkResult(sql, expected)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 13bced6..0c3f3f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -67,9 +67,7 @@ class PartitionableSinkITCase extends BatchTestBase {
   override def before(): Unit = {
     super.before()
     env.setParallelism(3)
-    tEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(3))
     registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
     registerCollection("starTable", testData2, type_int_string, "b, c", Array(true, true))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
index 5817fdb..c7445b6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
@@ -109,8 +109,7 @@ class SortLimitITCase extends BatchTestBase {
 
   @Test
   def testOrderBehindField(): Unit = {
-    tableConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     val expected = data3.sortBy((x : Row) => x.getField(2).asInstanceOf[String])
 
     checkResult(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index 0e6b735..97e6534 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -81,8 +81,7 @@ class TableSinkITCase extends BatchTestBase {
 
   @Test
   def testCollectSinkConfiguration(): Unit = {
-    tEnv.getConfig.getConfiguration.set(
-      CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1b"))
+    tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1b"))
     try {
       checkResult("SELECT 1", Seq(row(1)))
       Assert.fail("Expecting exception thrown from collect sink")
@@ -95,8 +94,7 @@ class TableSinkITCase extends BatchTestBase {
               "by setting collect-sink.batch-size.max"))
     }
 
-    tEnv.getConfig.getConfiguration.set(
-      CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
+    tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 59620b1..4631dad 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -342,8 +342,9 @@ class TableSourceITCase extends BatchTestBase {
 
   @Test
   def testTableHintWithLogicalTableScanReuse(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    tEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath
     tEnv.executeSql(
       s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index 27451d4..5f7a5d8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -48,8 +48,7 @@ class UnionITCase extends BatchTestBase {
     registerCollection("Table3", smallData3, type3, "a, b, c", nullablesOfSmallData3)
     registerCollection("Table5", data5, type5, "d, e, f, g, h", nullablesOfData5)
     registerCollection("Table6", data6, type6, "a, b, c", Array(false, false, false))
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
   }
 
   @Test
@@ -113,10 +112,9 @@ class UnionITCase extends BatchTestBase {
     */
   @Test
   def testJoinAfterDifferentTypeUnionAll(): Unit = {
-    tEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkResult(
       "SELECT a, c, g FROM (SELECT t1.a, t1.b, t1.c FROM Table3 t1 UNION ALL" +
           "(SELECT a, b, c FROM Table3 ORDER BY a, b, c)), Table5 WHERE b = e",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
index 52ce45c..c125235 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
@@ -79,7 +79,7 @@ class AggregateJoinTransposeITCase extends BatchTestBase {
     tEnv.getConfig.setPlannerConfig(calciteConfig)
 
     // HashJoin is disabled due to translateToPlanInternal method is not implemented yet
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
     registerCollection("T3", data3, type3, "a, b, c", nullablesOfData3)
 
     registerCollection("MyTable",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
index 88813e6..7bcbf40 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
@@ -113,12 +113,12 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
       FlinkStatistic.builder().uniqueKeys(Set(Set("a6").asJava).asJava).build()
     )
     // HashJoin is disabled due to translateToPlanInternal method is not implemented yet
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
   }
 
   @Test
   def testSingleAggOnTable_SortAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     testSingleAggOnTable()
     checkResult("SELECT a6, b6, max(c6), count(d6), sum(e6) FROM T6 GROUP BY a6, b6",
       (0 until 50000).map(i => row(i, 1L, if (i % 500 == 0) null else s"Hello$i", 1L, 10))
@@ -127,19 +127,18 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
 
   @Test
   def testSingleAggOnTable_HashAgg_WithLocalAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
+    tEnv.getConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
     // set smaller parallelism to avoid MemoryAllocationException
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(2))
     testSingleAggOnTable()
   }
 
   @Test
   def testSingleAggOnTable_HashAgg_WithoutLocalAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
+    tEnv.getConfig
+      .set(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
     testSingleAggOnTable()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
index 3cc1358..a3b013a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
@@ -27,7 +27,7 @@ class HashAggITCase
     extends AggregateITCaseBase("HashAggregate") {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
index 34c592a..2cfd255 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.utils.OperatorType
 class HashDistinctAggregateITCase extends DistinctAggregateITCaseBase {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.SortAgg.toString)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index 4c082fc..3aaa1e6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -45,7 +45,7 @@ import scala.collection.Seq
 class SortAggITCase
     extends AggregateITCaseBase("SortAggregate") {
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
 
     registerFunction("countFun", new CountAggFunction())
@@ -64,7 +64,7 @@ class SortAggITCase
 
   @Test
   def testBigDataSimpleArrayUDAF(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     registerFunction("simplePrimitiveArrayUdaf", new SimplePrimitiveArrayUdaf())
     registerRange("RangeT", 1000000)
     env.setParallelism(1)
@@ -265,7 +265,7 @@ class SortAggITCase
 
   @Test
   def testArrayUdaf(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     env.setParallelism(1)
     checkResult(
       "SELECT myPrimitiveArrayUdaf(a, b) FROM Table3",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index d9ccb89..fd8b445 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -35,7 +35,7 @@ import scala.collection.Seq
 class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,  OperatorType.HashAgg.toString)
 
     registerFunction("countFun", new CountAggFunction())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
index b16d4e7..2c3c7f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -153,8 +153,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
 
   @Test
   def testBigForSpill(): Unit = {
-
-    tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
 
     val bigData = Random.shuffle(
       bigIntStringData.union(bigIntStringData).union(bigIntStringData).union(bigIntStringData))
@@ -169,7 +168,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
   @Test
   def testSortMergeJoinOutputOrder(): Unit = {
     if (expectedJoinType == SortMergeJoin) {
-      tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
       env.getConfig.setParallelism(1)
 
       val bigData = Random.shuffle(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
index 32287be..fd738a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
@@ -28,8 +28,8 @@ import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{Broadcast
 object JoinITCaseHelper {
 
   def disableBroadcastHashJoin(tEnv: TableEnvironment): Unit = {
-    tEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
   }
 
   def disableOtherJoinOpForJoin(tEnv: TableEnvironment, expected: JoinType): Unit = {
@@ -37,8 +37,10 @@ object JoinITCaseHelper {
       case BroadcastHashJoin =>
         // set up the broadcast join threshold to Long.MaxValue
         // so that the threshold constraints are always met.
-        tEnv.getConfig.getConfiguration.setLong(
-          OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.MaxValue)
+        tEnv.getConfig
+          .set(
+            OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
+            Long.box(Long.MaxValue))
         "ShuffleHashJoin, NestedLoopJoin, SortMergeJoin"
       case HashJoin =>
         disableBroadcastHashJoin(tEnv)
@@ -46,8 +48,7 @@ object JoinITCaseHelper {
       case SortMergeJoin => "HashJoin, NestedLoopJoin"
       case NestedLoopJoin => "HashJoin, SortMergeJoin"
     }
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, disabledOperators)
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, disabledOperators)
   }
 
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
index 5b22ac5..bc13f47 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
@@ -182,8 +182,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin1(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -195,8 +195,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin2(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -208,8 +208,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin3(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -222,8 +222,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin4(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -235,8 +235,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin5(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -248,8 +248,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin6(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -261,8 +261,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin7(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
       s1r(d"1", d"1", 1, 1.0),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 969473a..39b73ef 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -62,12 +62,12 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     val tableConfig = tEnv.getConfig
     miniBatch match {
       case MiniBatchOn =>
-        tableConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true)
-        tableConfig.getConfiguration.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
         // trigger every record for easier harness test
-        tableConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 1L)
+        tableConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(1L))
         // disable local-global to only test the MiniBatchGroupAggFunction
-        tableConfig.getConfiguration.setString(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
+        tableConfig.set(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
       case MiniBatchOff =>
         tableConfig.getConfiguration.removeConfig(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY)
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index 6587af3..80b82d9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -310,8 +310,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
          |)
          |""".stripMargin)
 
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+    tEnv.getConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
 
     val sql =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index b5e55ae..2e8ee04 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -493,7 +493,7 @@ class CalcITCase extends StreamingTestBase {
     val tableId = TestValuesTableFactory.registerData(rows)
 
     // We need a fixed timezone to make sure this test can run on machines across the world
-    tEnv.getConfig.getConfiguration.setString("table.local-time-zone", "Europe/Berlin")
+    tEnv.getConfig.set("table.local-time-zone", "Europe/Berlin")
 
     tEnv.executeSql(s"""
                        |CREATE TABLE T (
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
index a5b3e88..7aea2ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
@@ -298,8 +298,8 @@ class ChangelogSourceITCase(
   }
 
   private def registerChangelogSourceWithEventsDuplicate(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, Boolean.box(true))
     val userChangelog: Seq[Row] = Seq(
       changelogRow("+I", "user1", "Tom", "tom@gmail.com", new JBigDecimal("10.02")),
       changelogRow("+I", "user2", "Jack", "jack@hotmail.com", new JBigDecimal("71.2")),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index 39dcd4e..479b1c2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -193,8 +193,9 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
   @Test
   def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
     Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED, true)
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED,
+      Boolean.box(true))
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
@@ -300,8 +301,9 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
   @Test
   def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
     Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED, true)
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED,
+      Boolean.box(true))
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 36b2d42..2df538e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -165,8 +165,8 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
 
   @Test
   def testMinMaxWithTumblingWindow(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
-    tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")
+    tEnv.getConfig.set("table.exec.emit.early-fire.enabled", "true")
+    tEnv.getConfig.set("table.exec.emit.early-fire.delay", "1000 ms")
 
     val sql =
       """
@@ -318,8 +318,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
       return
     }
     // wait 10 millisecond for late elements
-    tEnv.getConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofMillis(10))
+    tEnv.getConfig.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofMillis(10))
     // emit result without delay after watermark
     withLateFireDelay(tEnv.getConfig, Time.of(0, TimeUnit.NANOSECONDS))
     val data = List(
@@ -426,8 +425,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
   @Test
   def testWindowAggregateOnUpsertSourceWithAllowLateness(): Unit = {
     // wait 15 second for late elements
-    tEnv.getConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(15))
+    tEnv.getConfig.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(15))
     // emit result without delay after watermark
     withLateFireDelay(tEnv.getConfig, Time.of(0, TimeUnit.NANOSECONDS))
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
@@ -593,9 +591,8 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
       throw new RuntimeException(
         "Currently not support different lateFireInterval configs in one job")
     }
-    tableConfig.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
-    tableConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
+    tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
+    tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
   }
 
   private def localDateTime(epochSecond: Long): LocalDateTime = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
index fcc8f2b..e18486a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
@@ -67,8 +67,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -96,8 +96,9 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig.set(
+      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED,
+      Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -125,8 +126,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -154,8 +155,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -189,8 +190,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -222,8 +223,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index e0042ba..1a5dba2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -56,11 +56,11 @@ class SplitAggregateITCase(
     super.before()
 
     if (partialAggMode.isPartialAggEnabled) {
-      tEnv.getConfig.getConfiguration.setBoolean(
-        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+      tEnv.getConfig
+        .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     } else {
-      tEnv.getConfig.getConfiguration.setBoolean(
-        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, false)
+      tEnv.getConfig
+        .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(false))
     }
 
     val data = List(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 72a4b16..8e6d148 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -702,12 +702,12 @@ class TemporalJoinITCase(state: StateBackendMode)
 
   @Test
   def testMiniBatchEventTimeViewTemporalJoin(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s")
-    tEnv.getConfig.getConfiguration.setLong(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 4L)
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(4L))
 
     val sql = "INSERT INTO rowtime_default_sink " +
       " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time " +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index d0597d6..6b299f4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -168,7 +168,7 @@ class WindowAggregateITCase(
     tEnv.createFunction("concat_distinct_agg", classOf[ConcatDistinctAggFunction])
 
     tEnv.getConfig.setLocalTimeZone(SHANGHAI_ZONE)
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       aggPhase.toString)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
index f7f1dc6..9e41638 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
@@ -172,8 +172,9 @@ class WindowDistinctAggregateITCase(
          |)
          |""".stripMargin)
 
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, splitDistinct)
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED,
+      Boolean.box(splitDistinct))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index fe73d24..b52b2e0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -1253,7 +1253,8 @@ class TableSinkITCase extends StreamingTestBase {
     val t = env.fromCollection(tupleData3)
         .assignAscendingTimestamps(_._1.toLong)
         .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
 
     tEnv.executeSql(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 3146565..56e16cd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -518,9 +518,7 @@ object BatchTestBase {
   }
 
   def configForMiniCluster(tableConfig: TableConfig): Unit = {
-    tableConfig.
-      getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
-    tableConfig.getConfiguration
-      .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
+    tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(DEFAULT_PARALLELISM))
+    tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
index 02b42ac..ac8110c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
@@ -42,11 +42,11 @@ class StreamingWithAggTestBase(
     // in order to cover more code paths
     tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     if (aggMode.isLocalAggEnabled) {
-      tEnv.getConfig.getConfiguration.setString(
+      tEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.TWO_PHASE.toString)
     } else {
-      tEnv.getConfig.getConfiguration.setString(
+      tEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.ONE_PHASE.toString)
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index c75f571..6cb12cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1052,7 +1052,6 @@ abstract class TableTestUtil(
     TestingTableEnvironment.create(setting, catalogManager, tableConfig)
   val tableEnv: TableEnvironment = testingTableEnv
   tableEnv.getConfig
-    .getConfiguration
     .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
 
   private val env: StreamExecutionEnvironment = getPlanner.getExecEnv
@@ -1291,12 +1290,12 @@ case class StreamTableTestUtil(
   }
 
   def enableMiniBatch(): Unit = {
-    tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    tableEnv.getConfig.getConfiguration.set(
+    tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
-    tableEnv.getConfig.getConfiguration.setLong(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 3L)
+    tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
   }
 
   def createAppendTableSink(