You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 14:51:32 UTC

[flink] branch master updated (44da40f -> 66af14b)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 44da40f  [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest
     new d2e9473  [FLINK-26287][table] Add convenience method TableConfig.set
     new 66af14b  [FLINK-26287][table] Update some test classes for the new TableConfig.set

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/dev/python/dependency_management.md       |  2 +-
 docs/content.zh/docs/dev/table/config.md           | 24 ++++----
 .../docs/dev/table/sql/queries/overview.md         | 12 ++--
 docs/content.zh/docs/dev/table/tuning.md           | 41 ++++++-------
 .../docs/dev/python/dependency_management.md       |  2 +-
 docs/content/docs/dev/table/config.md              | 24 ++++----
 .../content/docs/dev/table/sql/queries/overview.md |  8 +--
 docs/content/docs/dev/table/tuning.md              | 69 ++++++++++-----------
 flink-python/pyflink/table/table_config.py         |  9 +++
 .../org/apache/flink/table/api/TableConfig.java    | 49 +++++++++++++--
 .../file/table/FileSystemTableSinkTest.java        |  8 +--
 .../apache/flink/table/api/CompiledPlanITCase.java |  2 +-
 .../flink/table/planner/codegen/CodeSplitTest.java | 16 ++---
 .../plan/batch/sql/ForwardHashExchangeTest.java    | 70 +++++++---------------
 .../nodes/exec/common/CommonExecSinkITCase.java    | 18 ++----
 .../nodes/exec/operator/BatchOperatorNameTest.java | 10 +---
 .../nodes/exec/operator/OperatorNameTestBase.java  |  1 -
 .../exec/operator/StreamOperatorNameTest.java      |  2 -
 .../exec/serde/ContextResolvedTableSerdeTest.java  | 13 ++--
 .../exec/stream/ChangelogSourceJsonPlanTest.java   |  4 +-
 .../plan/nodes/exec/stream/ExpandJsonPlanTest.java |  9 +--
 .../exec/stream/GroupAggregateJsonPlanTest.java    | 13 +---
 .../stream/IncrementalAggregateJsonPlanTest.java   | 21 ++-----
 .../exec/stream/WindowAggregateJsonPlanTest.java   |  4 +-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  |  5 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.java   | 25 +++-----
 .../batch/sql/ForwardHashExchangeITCase.java       |  9 +--
 .../sql/agg/LocalAggregatePushDownITCase.java      | 14 ++---
 .../jsonplan/DeduplicationJsonPlanITCase.java      |  4 +-
 .../stream/jsonplan/ExpandJsonPlanITCase.java      |  9 +--
 .../jsonplan/GroupAggregateJsonPlanITCase.java     | 13 +---
 .../IncrementalAggregateJsonPlanITCase.java        | 21 ++-----
 .../stream/jsonplan/WindowAggregateJsonITCase.java |  7 +--
 .../flink/table/api/TableEnvironmentITCase.scala   |  8 +--
 .../apache/flink/table/api/batch/ExplainTest.scala |  8 +--
 .../flink/table/api/stream/ExplainTest.scala       | 14 +++--
 .../table/planner/catalog/CatalogTableITCase.scala |  3 +-
 .../table/planner/catalog/CatalogViewITCase.scala  |  3 +-
 .../expressions/NonDeterministicTests.scala        | 11 ++--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../plan/batch/sql/DagOptimizationTest.scala       | 53 ++++++++--------
 .../RemoveRedundantLocalSortAggRuleTest.scala      | 10 ++--
 .../table/PythonGroupWindowAggregateTest.scala     |  4 +-
 .../plan/stream/table/TwoStageAggregateTest.scala  | 14 ++---
 .../GroupWindowTableAggregateValidationTest.scala  |  4 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |  4 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  6 +-
 .../runtime/batch/sql/CodeSplitITCase.scala        |  6 +-
 .../planner/runtime/batch/sql/DecimalITCase.scala  | 28 ++++-----
 .../planner/runtime/batch/sql/MiscITCase.scala     |  2 +-
 .../runtime/batch/sql/MultipleInputITCase.scala    | 10 ++--
 .../batch/sql/PartitionableSinkITCase.scala        |  4 +-
 .../runtime/batch/sql/SortLimitITCase.scala        |  3 +-
 .../runtime/batch/sql/TableSinkITCase.scala        |  6 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  5 +-
 .../planner/runtime/batch/sql/UnionITCase.scala    | 10 ++--
 .../sql/agg/AggregateJoinTransposeITCase.scala     |  2 +-
 .../sql/agg/AggregateReduceGroupingITCase.scala    | 17 +++---
 .../runtime/batch/sql/agg/HashAggITCase.scala      |  2 +-
 .../sql/agg/HashDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/agg/SortAggITCase.scala      |  6 +-
 .../sql/agg/SortDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/join/InnerJoinITCase.scala   |  5 +-
 .../runtime/batch/sql/join/JoinITCaseHelper.scala  | 13 ++--
 .../runtime/batch/table/DecimalITCase.scala        | 28 ++++-----
 .../harness/GroupAggregateHarnessTest.scala        |  8 +--
 .../harness/WindowAggregateHarnessTest.scala       |  3 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  2 +-
 .../runtime/stream/sql/ChangelogSourceITCase.scala |  4 +-
 .../runtime/stream/sql/DeduplicateITCase.scala     | 10 ++--
 .../runtime/stream/sql/GroupWindowITCase.scala     | 15 ++---
 .../planner/runtime/stream/sql/SortITCase.scala    | 25 ++++----
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  8 +--
 .../runtime/stream/sql/TemporalJoinITCase.scala    | 12 ++--
 .../runtime/stream/sql/WindowAggregateITCase.scala |  2 +-
 .../stream/sql/WindowDistinctAggregateITCase.scala |  5 +-
 .../runtime/stream/table/TableSinkITCase.scala     |  3 +-
 .../planner/runtime/utils/BatchTestBase.scala      |  6 +-
 .../runtime/utils/StreamingWithAggTestBase.scala   |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  | 11 ++--
 80 files changed, 427 insertions(+), 524 deletions(-)

[flink] 02/02: [FLINK-26287][table] Update some test classes for the new TableConfig.set

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66af14beb296797a3e35a2d338e86d846ac130df
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Feb 22 17:30:54 2022 +0100

    [FLINK-26287][table] Update some test classes for the new TableConfig.set
    
    This closes #18895.
---
 .../file/table/FileSystemTableSinkTest.java        |  8 +--
 .../apache/flink/table/api/CompiledPlanITCase.java |  2 +-
 .../flink/table/planner/codegen/CodeSplitTest.java | 16 ++---
 .../plan/batch/sql/ForwardHashExchangeTest.java    | 70 +++++++---------------
 .../nodes/exec/common/CommonExecSinkITCase.java    | 18 ++----
 .../nodes/exec/operator/BatchOperatorNameTest.java | 10 +---
 .../nodes/exec/operator/OperatorNameTestBase.java  |  1 -
 .../exec/operator/StreamOperatorNameTest.java      |  2 -
 .../exec/serde/ContextResolvedTableSerdeTest.java  | 13 ++--
 .../exec/stream/ChangelogSourceJsonPlanTest.java   |  4 +-
 .../plan/nodes/exec/stream/ExpandJsonPlanTest.java |  9 +--
 .../exec/stream/GroupAggregateJsonPlanTest.java    | 13 +---
 .../stream/IncrementalAggregateJsonPlanTest.java   | 21 ++-----
 .../exec/stream/WindowAggregateJsonPlanTest.java   |  4 +-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  |  5 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.java   | 25 +++-----
 .../batch/sql/ForwardHashExchangeITCase.java       |  9 +--
 .../sql/agg/LocalAggregatePushDownITCase.java      | 14 ++---
 .../jsonplan/DeduplicationJsonPlanITCase.java      |  4 +-
 .../stream/jsonplan/ExpandJsonPlanITCase.java      |  9 +--
 .../jsonplan/GroupAggregateJsonPlanITCase.java     | 13 +---
 .../IncrementalAggregateJsonPlanITCase.java        | 21 ++-----
 .../stream/jsonplan/WindowAggregateJsonITCase.java |  7 +--
 .../flink/table/api/TableEnvironmentITCase.scala   |  8 +--
 .../apache/flink/table/api/batch/ExplainTest.scala |  8 +--
 .../flink/table/api/stream/ExplainTest.scala       | 14 +++--
 .../table/planner/catalog/CatalogTableITCase.scala |  3 +-
 .../table/planner/catalog/CatalogViewITCase.scala  |  3 +-
 .../expressions/NonDeterministicTests.scala        | 11 ++--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../plan/batch/sql/DagOptimizationTest.scala       | 53 ++++++++--------
 .../RemoveRedundantLocalSortAggRuleTest.scala      | 10 ++--
 .../table/PythonGroupWindowAggregateTest.scala     |  4 +-
 .../plan/stream/table/TwoStageAggregateTest.scala  | 14 ++---
 .../GroupWindowTableAggregateValidationTest.scala  |  4 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |  4 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  6 +-
 .../runtime/batch/sql/CodeSplitITCase.scala        |  6 +-
 .../planner/runtime/batch/sql/DecimalITCase.scala  | 28 ++++-----
 .../planner/runtime/batch/sql/MiscITCase.scala     |  2 +-
 .../runtime/batch/sql/MultipleInputITCase.scala    | 10 ++--
 .../batch/sql/PartitionableSinkITCase.scala        |  4 +-
 .../runtime/batch/sql/SortLimitITCase.scala        |  3 +-
 .../runtime/batch/sql/TableSinkITCase.scala        |  6 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  5 +-
 .../planner/runtime/batch/sql/UnionITCase.scala    | 10 ++--
 .../sql/agg/AggregateJoinTransposeITCase.scala     |  2 +-
 .../sql/agg/AggregateReduceGroupingITCase.scala    | 17 +++---
 .../runtime/batch/sql/agg/HashAggITCase.scala      |  2 +-
 .../sql/agg/HashDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/agg/SortAggITCase.scala      |  6 +-
 .../sql/agg/SortDistinctAggregateITCase.scala      |  2 +-
 .../runtime/batch/sql/join/InnerJoinITCase.scala   |  5 +-
 .../runtime/batch/sql/join/JoinITCaseHelper.scala  | 13 ++--
 .../runtime/batch/table/DecimalITCase.scala        | 28 ++++-----
 .../harness/GroupAggregateHarnessTest.scala        |  8 +--
 .../harness/WindowAggregateHarnessTest.scala       |  3 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  2 +-
 .../runtime/stream/sql/ChangelogSourceITCase.scala |  4 +-
 .../runtime/stream/sql/DeduplicateITCase.scala     | 10 ++--
 .../runtime/stream/sql/GroupWindowITCase.scala     | 15 ++---
 .../planner/runtime/stream/sql/SortITCase.scala    | 25 ++++----
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  8 +--
 .../runtime/stream/sql/TemporalJoinITCase.scala    | 12 ++--
 .../runtime/stream/sql/WindowAggregateITCase.scala |  2 +-
 .../stream/sql/WindowDistinctAggregateITCase.scala |  5 +-
 .../runtime/stream/table/TableSinkITCase.scala     |  3 +-
 .../planner/runtime/utils/BatchTestBase.scala      |  6 +-
 .../runtime/utils/StreamingWithAggTestBase.scala   |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  | 11 ++--
 70 files changed, 286 insertions(+), 425 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
index 9c9d28b..d329688 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
@@ -62,9 +62,7 @@ public class FileSystemTableSinkTest {
         final int parallelism = 5;
         final TableEnvironment tEnv =
                 TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
 
         final String testSourceTableName = "test_source_table";
         tEnv.executeSql(buildSourceTableSql(testSourceTableName, false));
@@ -98,9 +96,7 @@ public class FileSystemTableSinkTest {
     public void testFileSystemTableSinkWithParallelismInBatch() {
         final int parallelism = 5;
         final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
 
         final String testSourceTableName = "test_source_table";
         final String testSinkTableName = "test_sink_table";
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index c9c8b31..b43b6a6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -218,7 +218,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
 
     @Test
     public void testCompilePlanOverwrite() throws Exception {
-        tableEnv.getConfig().getConfiguration().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
+        tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
 
         Path planPath =
                 Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
index 1a1d7de..2d166c6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
@@ -220,21 +220,13 @@ public class CodeSplitTest {
 
     private void runTest(Consumer<TableConfig> consumer) {
         TableConfig splitTableConfig = new TableConfig();
-        splitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000);
-        splitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000);
+        splitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000);
+        splitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000);
         consumer.accept(splitTableConfig);
 
         TableConfig noSplitTableConfig = new TableConfig();
-        noSplitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Integer.MAX_VALUE);
-        noSplitTableConfig
-                .getConfiguration()
-                .setInteger(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Integer.MAX_VALUE);
+        noSplitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Integer.MAX_VALUE);
+        noSplitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Integer.MAX_VALUE);
         PrintStream originalStdOut = System.out;
         try {
             // redirect stdout to a null output stream to silence compile error in CompileUtils
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index 3297879..fd68753 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -75,8 +75,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnHashAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 " SELECT\n"
                         + "   SUM(b) sum_b,\n"
@@ -91,8 +90,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnHashAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan("SELECT b, RANK() OVER (ORDER BY b) FROM (SELECT SUM(b) AS b FROM T)");
     }
 
@@ -100,8 +98,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.verifyExecPlan(
                 " SELECT\n"
                         + "   SUM(b) sum_b,\n"
@@ -116,8 +113,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testOverAggOnSortAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.verifyExecPlan("SELECT b, RANK() OVER (ORDER BY b) FROM (SELECT SUM(b) AS b FROM T)");
     }
 
@@ -125,8 +121,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashAggOnHashJoinWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "SortMergeJoin,NestedLoopJoin,SortAgg");
         util.verifyExecPlan(
@@ -138,8 +133,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortAggOnSortMergeJoinWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin,NestedLoopJoin,HashAgg");
         util.verifyExecPlan(
@@ -151,12 +145,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashAggOnNestedLoopJoinWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         // TODO the shuffle between join and agg can be removed
         util.verifyExecPlan(
                 "WITH r AS (SELECT * FROM T1 FULL OUTER JOIN T2 ON true)\n"
@@ -167,12 +159,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortAggOnNestedLoopJoinWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         // TODO the shuffle between join and agg can be removed
         util.verifyExecPlan(
                 "WITH r AS (SELECT * FROM T1 FULL OUTER JOIN T2 ON true)\n"
@@ -183,8 +173,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnHashAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (\n"
@@ -197,8 +186,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnHashAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT b, RANK() OVER(ORDER BY b) rk FROM (\n"
@@ -211,8 +199,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (\n"
@@ -225,8 +212,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testRankOnSortAggWithGlobalShuffle() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT b, RANK() OVER(ORDER BY b) rk FROM (\n"
@@ -239,14 +225,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testHashJoinWithMultipleInputDisabled() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
-                        "SortMergeJoin,NestedLoopJoin");
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
+                        "SortMergeJoin,NestedLoopJoin")
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
         util.verifyExecPlan(
                 "SELECT * FROM\n"
                         + "  (SELECT a FROM T1 JOIN T ON a = a1) t1\n"
@@ -259,14 +241,10 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testSortJoinWithMultipleInputDisabled() {
         util.tableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
-                        "HashJoin,NestedLoopJoin");
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
+                        "HashJoin,NestedLoopJoin")
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false);
         util.verifyExecPlan(
                 "SELECT * FROM\n"
                         + "  (SELECT a FROM T1 JOIN T ON a = a1) t1\n"
@@ -279,12 +257,8 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void testMultipleInputs() {
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
-        util.getTableEnv()
-                .getConfig()
-                .getConfiguration()
-                .setString(
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false)
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "NestedLoopJoin,SortMergeJoin,SortAgg");
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 56d90b3..44f36a2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -235,8 +235,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // accordingly, based on their type length
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
 
@@ -255,8 +254,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
         }
@@ -315,8 +313,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // accordingly, based on their type length
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
 
@@ -359,8 +356,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
                             ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
         }
@@ -415,8 +411,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         // Change config option to "drop", to drop the columns instead of throwing errors
         try {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(),
                             ExecutionConfigOptions.NotNullEnforcer.DROP.name());
 
@@ -431,8 +426,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
             assertThat(results.get().get(1).getInt(2)).isEqualTo(33);
         } finally {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .setString(
+                    .set(
                             TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(),
                             ExecutionConfigOptions.NotNullEnforcer.ERROR.name());
         }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
index 1d55870..a336a42 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
@@ -65,9 +65,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     /** Verify Sort, SortAggregate. */
     @Test
     public void testSortAggregate() {
-        tEnv.getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         createTestSource();
         verifyQuery("SELECT a, " + "count(distinct b) as b " + "FROM MyTable GROUP BY a");
     }
@@ -95,8 +93,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testNestedLoopJoin() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin, SortMergeJoin");
         testJoinInternal();
@@ -106,8 +103,7 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testSortMergeJoin() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin, NestedLoopJoin");
         testJoinInternal();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
index 82f8519..82c3fbf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/OperatorNameTestBase.java
@@ -53,7 +53,6 @@ public abstract class OperatorNameTestBase extends TableTestBase {
         util.getStreamEnv().setParallelism(2);
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED,
                         isNameSimplifyEnabled);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
index fb4fad5..a6c8eac 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.java
@@ -146,7 +146,6 @@ public class StreamOperatorNameTest extends OperatorNameTestBase {
     public void testIncrementalAggregate() {
         util.enableMiniBatch();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestSource();
         verifyQuery("SELECT a, " + "count(distinct b) as b " + "FROM MyTable GROUP BY a");
@@ -274,7 +273,6 @@ public class StreamOperatorNameTest extends OperatorNameTestBase {
     @Test
     public void testWindowAggregate() {
         tEnv.getConfig()
-                .getConfiguration()
                 .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         createSourceWithTimeAttribute();
         verifyQuery(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
index ba059dd..414c954 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
@@ -811,13 +811,12 @@ public class ContextResolvedTableSerdeTest {
             TableConfigOptions.CatalogPlanCompilation planCompilationOption,
             TableConfigOptions.CatalogPlanRestore planRestoreOption) {
         // Create table options
-        TableConfig tableConfig = TableConfig.getDefault();
-        tableConfig
-                .getConfiguration()
-                .set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, planRestoreOption);
-        tableConfig
-                .getConfiguration()
-                .set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, planCompilationOption);
+        TableConfig tableConfig =
+                TableConfig.getDefault()
+                        .set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, planRestoreOption)
+                        .set(
+                                TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS,
+                                planCompilationOption);
 
         return JsonSerdeTestUtil.configuredSerdeContext(CATALOG_MANAGER, tableConfig);
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
index cc9791a..1ec1567 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
@@ -37,9 +37,7 @@ public class ChangelogSourceJsonPlanTest extends TableTestBase {
     public void setup() {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
-        tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
+        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
     }
 
     @Test
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
index 33c3293..b55cf03 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest.java
@@ -40,15 +40,10 @@ public class ExpandJsonPlanTest extends TableTestBase {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.ONE_PHASE.name());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.ONE_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), false);
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
index 86238e0..27b7296 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
@@ -57,20 +57,13 @@ public class GroupAggregateJsonPlanTest extends TableTestBase {
         tEnv = util.getTableEnv();
         if (isMiniBatchEnabled) {
             tEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-            tEnv.getConfig()
-                    .getConfiguration()
+                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                     .set(
                             ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10));
-            tEnv.getConfig()
-                    .getConfiguration()
+                            Duration.ofSeconds(10))
                     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
         } else {
-            tEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
+            tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
         }
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
index 5eecb92..cc210a6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
@@ -43,26 +43,15 @@ public class IncrementalAggregateJsonPlanTest extends TableTestBase {
         util = streamTestUtil(TableConfig.getDefault());
         tEnv = util.getTableEnv();
         tEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name());
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-        tEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10));
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        tEnv.getConfig()
-                .getConfiguration()
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
index 1ac216b..7f6672f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
@@ -275,9 +275,7 @@ public class WindowAggregateJsonPlanTest extends TableTestBase {
     @Test
     public void testDistinctSplitEnabled() {
         tEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         String sinkTableDdl =
                 "CREATE TABLE MySink (\n"
                         + "  a bigint,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index e47a110..58215cb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -237,10 +237,7 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
 
     @Test
     public void testWatermarkWithIdleSource() {
-        util.tableEnv()
-                .getConfig()
-                .getConfiguration()
-                .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(1000));
+        util.tableEnv().getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(1000));
         String ddl =
                 "CREATE TABLE MyTable("
                         + "  a INT,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
index a3f91ab..51aee49 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
@@ -39,11 +39,8 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
     @Before
     public void setup() {
         TableConfig tableConfig = util.tableEnv().getConfig();
-        tableConfig
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
-                        true);
+        tableConfig.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, true);
         String ddl =
                 "CREATE TABLE inventory (\n"
                         + "  id BIGINT,\n"
@@ -143,8 +140,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setBoolean(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
                         true);
     }
@@ -166,8 +162,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable sort agg
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         util.verifyRelPlan(
                 "SELECT\n"
@@ -181,8 +176,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
     }
 
     @Test
@@ -190,8 +184,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable sort agg
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         util.verifyRelPlan(
                 "SELECT\n"
@@ -204,8 +197,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // reset config
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "");
     }
 
     @Test
@@ -263,8 +255,7 @@ public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
         // enable two-phase aggregate, otherwise there is no local aggregate
         util.getTableEnv()
                 .getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         util.verifyRelPlan(
                 "SELECT\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index cda6090..d7737b6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -40,7 +40,6 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
         env().getConfig().setDynamicGraph(true);
         env().disableOperatorChaining();
         tEnv().getConfig()
-                .getConfiguration()
                 .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
 
         String testDataId = TestValuesTableFactory.registerData(TestData.data3());
@@ -61,9 +60,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
 
     @Test
     public void testOverAggWithHashAgg() {
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg");
         checkResult(
                 "SELECT\n"
                         + "   b,\n"
@@ -85,9 +82,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
 
     @Test
     public void testOverAggWithSortAgg() {
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
         checkResult(
                 "SELECT\n"
                         + "   b,\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
index b6522c0..778bb09 100755
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
@@ -134,8 +134,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     public void testDisablePushDownLocalAgg() {
         // disable push down local agg
         tEnv().getConfig()
-                .getConfiguration()
-                .setBoolean(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
                         false);
 
@@ -170,9 +169,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     @Test
     public void testPushDownLocalSortAggWithoutSort() {
         // enable sort agg
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         checkResult(
                 "SELECT\n"
@@ -188,9 +185,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     @Test
     public void testPushDownLocalSortAggWithSort() {
         // enable sort agg
-        tEnv().getConfig()
-                .getConfiguration()
-                .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
+        tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg");
 
         checkResult(
                 "SELECT\n"
@@ -296,8 +291,7 @@ public class LocalAggregatePushDownITCase extends BatchTestBase {
     public void testPushDownLocalAggWithoutAuxGrouping() {
         // enable two-phase aggregate, otherwise there is no local aggregate
         tEnv().getConfig()
-                .getConfiguration()
-                .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
 
         checkResult(
                 "SELECT\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
index d9673f9..68bf98a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
@@ -68,9 +68,7 @@ public class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
                                 + "    ROW_NUMBER() OVER (PARTITION BY product ORDER BY event_time ASC) AS row_num\n"
                                 + "  FROM MyTable)\n"
                                 + "WHERE row_num = 1 \n");
-        tableEnv.getConfig()
-                .getConfiguration()
-                .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
         checkTransformationUids(compiledPlan);
         tableEnv.executePlan(compiledPlan).await();
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
index 46188c8..4ebf82e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
@@ -37,15 +37,10 @@ public class ExpandJsonPlanITCase extends JsonPlanTestBase {
     @Test
     public void testExpand() throws Exception {
         tableEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.ONE_PHASE.name());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.ONE_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), false);
 
         createTestValuesSourceTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
index 924735a..3450478 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
@@ -53,20 +53,13 @@ public class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
         super.setup();
         if (isMiniBatchEnabled) {
             tableEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-            tableEnv.getConfig()
-                    .getConfiguration()
+                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                     .set(
                             ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10));
-            tableEnv.getConfig()
-                    .getConfiguration()
+                            Duration.ofSeconds(10))
                     .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
         } else {
-            tableEnv.getConfig()
-                    .getConfiguration()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
+            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false);
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
index d6b73a9..ea76839 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -43,26 +43,15 @@ public class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
     public void setup() throws Exception {
         super.setup();
         tableEnv.getConfig()
-                .getConfiguration()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name());
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10));
-        tableEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        tableEnv.getConfig()
-                .getConfiguration()
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
                 .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index f9b3f26..6e7e848 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -72,8 +72,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase {
                     }
                 });
         tableEnv.getConfig()
-                .getConfiguration()
-                .setString(
+                .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
                         aggPhase.toString());
     }
@@ -181,9 +180,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase {
     @Test
     public void testDistinctSplitEnabled() throws Exception {
         tableEnv.getConfig()
-                .getConfiguration()
-                .setBoolean(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestValuesSinkTable(
                 "MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT");
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 9d5cb95..25048d8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -86,7 +86,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
 
   @Test
   def testSetPlannerType: Unit = {
-    tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
+    tEnv.getConfig.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
 
     TestTableSourceSinks.createCsvTemporarySinkTable(
       tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink1")
@@ -106,9 +106,9 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
   @Test
   def testSetExecutionMode(): Unit = {
     if (isStreaming) {
-      tEnv.getConfig.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+      tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
     } else {
-      tEnv.getConfig.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
+      tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE,
         RuntimeExecutionMode.STREAMING)
     }
 
@@ -377,7 +377,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
 
   @Test
   def testTableDMLSync(): Unit = {
-    tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_DML_SYNC, Boolean.box(true))
+    tEnv.getConfig.set(TableConfigOptions.TABLE_DML_SYNC, Boolean.box(true))
     val sink1Path = _tempFolder.newFolder().toString
     tEnv.executeSql(
       s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index 8314f67..f5ecaf2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -49,8 +49,8 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4))
   }
 
   @Test
@@ -76,7 +76,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   @Test
   def testExplainWithJoin(): Unit = {
     // TODO support other join operators when them are supported
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     util.verifyExplain("SELECT a, b, c, e, f FROM MyTable1, MyTable2 WHERE a = d", extraDetails: _*)
   }
@@ -119,7 +119,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Test
   def testExplainMultipleInput(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin")
     val sql =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index a66db23..88197ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -52,8 +52,8 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
   @Before
   def before(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4)
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4))
   }
 
   @Test
@@ -128,10 +128,12 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
       "T2", 'id2, 'cnt, 'name, 'goods, 'rowtime.rowtime)
     util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0)
     util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.set(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(3))
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    util.tableEnv.getConfig
+      .set(
+        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+        Duration.ofSeconds(3))
     val table = util.tableEnv.sqlQuery(
       """
         |SELECT id1, T3.rowtime AS ts, text
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 25ed7ca..049931d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -62,8 +62,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   @Before
   def before(): Unit = {
     tableEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     TestCollectionTableFactory.reset()
 
     val func = new CatalogFunctionImpl(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
index 5bfd016..634fac1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
@@ -58,8 +58,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   @Before
   def before(): Unit = {
     tableEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     TestCollectionTableFactory.reset()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
index b76822d..b779f72 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
@@ -80,11 +80,12 @@ class NonDeterministicTests extends ExpressionTestBase {
   def testTemporalFunctionsInBatchMode(): Unit = {
     val zoneId = ZoneId.of("Asia/Shanghai")
     config.setLocalTimeZone(zoneId)
-    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+    config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
 
-    config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, 1123L)
-    config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
-      1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L))
+    config.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, Long.box(1123L))
+    config.set(
+      InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
+      Long.box(1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L)))
 
     val temporalFunctions = getCodeGenFunctions(List(
       "CURRENT_DATE",
@@ -109,7 +110,7 @@ class NonDeterministicTests extends ExpressionTestBase {
 
   @Test
   def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
-    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+    config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
     val temporalFunctions = getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
 
     val round1 = evaluateFunctionResult(temporalFunctions)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 3f9b558..07dff77 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -100,7 +100,7 @@ abstract class ExpressionTestBase {
 
   @Before
   def prepare(): Unit = {
-    config.getConfiguration.set(
+    config.set(
       ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
       ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
     )
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index b6fc21e..46e824a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -129,8 +129,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val table1 = util.tableEnv.sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c")
     util.tableEnv.registerTable("table1", table1)
     val table2 = util.tableEnv.sqlQuery("SELECT SUM(sum_a) AS total_sum FROM table1")
@@ -150,8 +151,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(true))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10")
@@ -174,8 +175,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)
 
     val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10")
@@ -198,8 +199,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10")
     util.tableEnv.registerTable("table1", table1)
@@ -227,8 +229,9 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinks5(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     // test with non-deterministic udf
     util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
     val table1 = util.tableEnv.sqlQuery("SELECT random_udf(a) AS a, c FROM MyTable")
@@ -250,8 +253,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiLevelViews(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
     util.tableEnv.registerTable("TempTable1", table1)
@@ -290,8 +293,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksWithUDTF(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addFunction("split", new TableFunc1)
     val sqlQuery1 =
       """
@@ -332,8 +335,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion1(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
 
     val table = util.tableEnv.sqlQuery(
       "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1")
@@ -355,10 +358,12 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion2(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED,
+      Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 =
@@ -394,8 +399,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion3(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery1 = "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"
@@ -426,8 +431,8 @@ class DagOptimizationTest extends TableTestBase {
   @Test
   def testMultiSinksSplitOnUnion4(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.tableEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
+    util.tableEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, Boolean.box(false))
     util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)
 
     val sqlQuery =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
index 8e3e999..5420ae9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRuleTest.scala
@@ -40,11 +40,11 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalSortAggWithSort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortMergeJoin,NestedLoopJoin,HashAgg")
     // disable BroadcastHashJoin
-    util.tableEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
     val sqlQuery =
       """
         |WITH r AS (SELECT * FROM x, y WHERE a = d AND c LIKE 'He%')
@@ -55,7 +55,7 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testRemoveRedundantLocalSortAggWithoutSort(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin,NestedLoopJoin,HashAgg")
     val sqlQuery =
       """
@@ -67,7 +67,7 @@ class RemoveRedundantLocalSortAggRuleTest extends TableTestBase {
 
   @Test
   def testUsingLocalAggCallFilters(): Unit = {
-    util.tableEnv.getConfig.getConfiguration.setString(
+    util.tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" +
       "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
index 15bd134..99d6431 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonGroupWindowAggregateTest.scala
@@ -182,8 +182,8 @@ class PythonGroupWindowAggregateTest extends TableTestBase {
   def testEmitStrategyNotSupported(): Unit = {
     val util = streamTestUtil()
     val tableConf = util.getTableEnv.getConfig
-    tableConf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, true)
-    tableConf.getConfiguration.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
 
     val sourceTable = util.addTableSource[(Int, Long, Int, Long)](
       "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
index 7d34a64..92cbea6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.scala
@@ -36,15 +36,13 @@ class TwoStageAggregateTest extends TableTestBase {
     util = streamTestUtil()
     util.tableEnv.getConfig
       .setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-    util.tableEnv.getConfig.getConfiguration
+    util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
-    util.tableEnv.getConfig.getConfiguration
-        .setBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    util.tableEnv.getConfig.getConfiguration
-      .setLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 3)
-    util.tableEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-      AggregatePhaseStrategy.TWO_PHASE.toString)
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3))
+      .set(
+        OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+        AggregatePhaseStrategy.TWO_PHASE.toString)
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
index 9a7894c..9708cda 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/GroupWindowTableAggregateValidationTest.scala
@@ -72,8 +72,8 @@ class GroupWindowTableAggregateValidationTest extends TableTestBase {
     val table = util.addTableSource[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
 
     val tableConf = util.getTableEnv.getConfig
-    tableConf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, true)
-    tableConf.getConfiguration.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, Boolean.box(true))
+    tableConf.set(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY, Duration.ofMillis(10))
 
     val result = table
       .window(Tumble over 2.hours on 'proctime as 'w)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index bded741..3f1743d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -432,8 +432,8 @@ trait FileSystemITCaseBase {
 
   @Test
   def testLimitPushDown(): Unit = {
-    tableEnv.getConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     tableEnv.executeSql("insert into nonPartitionedTable select x, y, a, b from originalT").await()
 
     check(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 9ba2247..e591f10 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1333,10 +1333,8 @@ class CalcITCase extends BatchTestBase {
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
-    tableConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
-    tableConfig.getConfiguration.setBoolean(
-      BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, true)
+    tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
+    tableConfig.set(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true))
     checkResult(
       "select * from BinaryT order by c",
       nullData3.sortBy((x : Row) =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index caad536..3a3534c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -136,10 +136,8 @@ class CodeSplitITCase extends BatchTestBase {
   }
 
   private[flink] def runTest(sql: String, results: Seq[Row]): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(
-      TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000)
-    tEnv.getConfig.getConfiguration.setInteger(
-      TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000)
+    tEnv.getConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Int.box(4000))
+    tEnv.getConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Int.box(10000))
     checkResult(sql.mkString, results)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
index c1c49be..459f0aa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
@@ -248,8 +248,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin1(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -261,8 +261,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin2(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -274,8 +274,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin3(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -288,8 +288,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin4(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -301,8 +301,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin5(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -314,8 +314,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin6(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -327,8 +327,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin7(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkQuery1(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
       s1r(d"1", d"1", 1, 1.0),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
index 9cbca51..16f29d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
@@ -415,7 +415,7 @@ class MiscITCase extends BatchTestBase {
 
   @Test
   def testOrderByAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     env.setParallelism(1)
     checkQuery(
       Seq((1, 10), (1, 20), (10, 1), (10, 2)),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
index bc115d5..99e1985 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
@@ -74,7 +74,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode) extends BatchTestBase {
       "a, b, c, nt",
       MultipleInputITCase.nullables)
 
-    tEnv.getConfig.getConfiguration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
+    tEnv.getConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode)
   }
 
   @Test
@@ -206,11 +206,11 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode) extends BatchTestBase {
   }
 
   def checkMultipleInputResult(sql: String): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, false)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(false))
     val expected = executeQuery(sql)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, true)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(true))
     checkResult(sql, expected)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 13bced6..0c3f3f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -67,9 +67,7 @@ class PartitionableSinkITCase extends BatchTestBase {
   override def before(): Unit = {
     super.before()
     env.setParallelism(3)
-    tEnv.getConfig
-      .getConfiguration
-      .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(3))
     registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
     registerCollection("starTable", testData2, type_int_string, "b, c", Array(true, true))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
index 5817fdb..c7445b6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
@@ -109,8 +109,7 @@ class SortLimitITCase extends BatchTestBase {
 
   @Test
   def testOrderBehindField(): Unit = {
-    tableConfig.getConfiguration.setInteger(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     val expected = data3.sortBy((x : Row) => x.getField(2).asInstanceOf[String])
 
     checkResult(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index 0e6b735..97e6534 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -81,8 +81,7 @@ class TableSinkITCase extends BatchTestBase {
 
   @Test
   def testCollectSinkConfiguration(): Unit = {
-    tEnv.getConfig.getConfiguration.set(
-      CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1b"))
+    tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1b"))
     try {
       checkResult("SELECT 1", Seq(row(1)))
       Assert.fail("Expecting exception thrown from collect sink")
@@ -95,8 +94,7 @@ class TableSinkITCase extends BatchTestBase {
               "by setting collect-sink.batch-size.max"))
     }
 
-    tEnv.getConfig.getConfiguration.set(
-      CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
+    tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 59620b1..4631dad 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -342,8 +342,9 @@ class TableSourceITCase extends BatchTestBase {
 
   @Test
   def testTableHintWithLogicalTableScanReuse(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
+    tEnv.getConfig.set(
+      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      Boolean.box(true))
     val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath
     tEnv.executeSql(
       s"""
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index 27451d4..5f7a5d8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -48,8 +48,7 @@ class UnionITCase extends BatchTestBase {
     registerCollection("Table3", smallData3, type3, "a, b, c", nullablesOfSmallData3)
     registerCollection("Table5", data5, type5, "d, e, f, g, h", nullablesOfData5)
     registerCollection("Table6", data6, type6, "a, b, c", Array(false, false, false))
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
   }
 
   @Test
@@ -113,10 +112,9 @@ class UnionITCase extends BatchTestBase {
     */
   @Test
   def testJoinAfterDifferentTypeUnionAll(): Unit = {
-    tEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkResult(
       "SELECT a, c, g FROM (SELECT t1.a, t1.b, t1.c FROM Table3 t1 UNION ALL" +
           "(SELECT a, b, c FROM Table3 ORDER BY a, b, c)), Table5 WHERE b = e",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
index 52ce45c..c125235 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateJoinTransposeITCase.scala
@@ -79,7 +79,7 @@ class AggregateJoinTransposeITCase extends BatchTestBase {
     tEnv.getConfig.setPlannerConfig(calciteConfig)
 
     // HashJoin is disabled due to translateToPlanInternal method is not implemented yet
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
     registerCollection("T3", data3, type3, "a, b, c", nullablesOfData3)
 
     registerCollection("MyTable",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
index 88813e6..7bcbf40 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
@@ -113,12 +113,12 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
       FlinkStatistic.builder().uniqueKeys(Set(Set("a6").asJava).asJava).build()
     )
     // HashJoin is disabled due to translateToPlanInternal method is not implemented yet
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashJoin")
   }
 
   @Test
   def testSingleAggOnTable_SortAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
     testSingleAggOnTable()
     checkResult("SELECT a6, b6, max(c6), count(d6), sum(e6) FROM T6 GROUP BY a6, b6",
       (0 until 50000).map(i => row(i, 1L, if (i % 500 == 0) null else s"Hello$i", 1L, 10))
@@ -127,19 +127,18 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
 
   @Test
   def testSingleAggOnTable_HashAgg_WithLocalAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+    tEnv.getConfig.set(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
+    tEnv.getConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
     // set smaller parallelism to avoid MemoryAllocationException
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(2))
     testSingleAggOnTable()
   }
 
   @Test
   def testSingleAggOnTable_HashAgg_WithoutLocalAgg(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
+    tEnv.getConfig
+      .set(TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
     testSingleAggOnTable()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
index 3cc1358..a3b013a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
@@ -27,7 +27,7 @@ class HashAggITCase
     extends AggregateITCaseBase("HashAggregate") {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "SortAgg")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
index 34c592a..2cfd255 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashDistinctAggregateITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.utils.OperatorType
 class HashDistinctAggregateITCase extends DistinctAggregateITCaseBase {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, OperatorType.SortAgg.toString)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index 4c082fc..3aaa1e6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -45,7 +45,7 @@ import scala.collection.Seq
 class SortAggITCase
     extends AggregateITCaseBase("SortAggregate") {
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
 
     registerFunction("countFun", new CountAggFunction())
@@ -64,7 +64,7 @@ class SortAggITCase
 
   @Test
   def testBigDataSimpleArrayUDAF(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     registerFunction("simplePrimitiveArrayUdaf", new SimplePrimitiveArrayUdaf())
     registerRange("RangeT", 1000000)
     env.setParallelism(1)
@@ -265,7 +265,7 @@ class SortAggITCase
 
   @Test
   def testArrayUdaf(): Unit = {
-    tEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tEnv.getConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
     env.setParallelism(1)
     checkResult(
       "SELECT myPrimitiveArrayUdaf(a, b) FROM Table3",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index d9ccb89..fd8b445 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -35,7 +35,7 @@ import scala.collection.Seq
 class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase {
 
   override def prepareAggOp(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,  OperatorType.HashAgg.toString)
 
     registerFunction("countFun", new CountAggFunction())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
index b16d4e7..2c3c7f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -153,8 +153,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
 
   @Test
   def testBigForSpill(): Unit = {
-
-    tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
 
     val bigData = Random.shuffle(
       bigIntStringData.union(bigIntStringData).union(bigIntStringData).union(bigIntStringData))
@@ -169,7 +168,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
   @Test
   def testSortMergeJoinOutputOrder(): Unit = {
     if (expectedJoinType == SortMergeJoin) {
-      tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1))
       env.getConfig.setParallelism(1)
 
       val bigData = Random.shuffle(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
index 32287be..fd738a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCaseHelper.scala
@@ -28,8 +28,8 @@ import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{Broadcast
 object JoinITCaseHelper {
 
   def disableBroadcastHashJoin(tEnv: TableEnvironment): Unit = {
-    tEnv.getConfig.getConfiguration.setLong(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1)
+    tEnv.getConfig
+      .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(-1))
   }
 
   def disableOtherJoinOpForJoin(tEnv: TableEnvironment, expected: JoinType): Unit = {
@@ -37,8 +37,10 @@ object JoinITCaseHelper {
       case BroadcastHashJoin =>
         // set up the broadcast join threshold to Long.MaxValue
         // so that the threshold constraints are always met.
-        tEnv.getConfig.getConfiguration.setLong(
-          OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.MaxValue)
+        tEnv.getConfig
+          .set(
+            OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
+            Long.box(Long.MaxValue))
         "ShuffleHashJoin, NestedLoopJoin, SortMergeJoin"
       case HashJoin =>
         disableBroadcastHashJoin(tEnv)
@@ -46,8 +48,7 @@ object JoinITCaseHelper {
       case SortMergeJoin => "HashJoin, NestedLoopJoin"
       case NestedLoopJoin => "HashJoin, SortMergeJoin"
     }
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, disabledOperators)
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, disabledOperators)
   }
 
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
index 5b22ac5..bc13f47 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
@@ -182,8 +182,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin1(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -195,8 +195,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin2(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -208,8 +208,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin3(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -222,8 +222,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin4(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -235,8 +235,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin5(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -248,8 +248,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin6(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
 
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
@@ -261,8 +261,8 @@ class DecimalITCase extends BatchTestBase {
 
   @Test
   def testJoin7(): Unit = {
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
     checkQuery(
       Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
       s1r(d"1", d"1", 1, 1.0),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 969473a..39b73ef 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -62,12 +62,12 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     val tableConfig = tEnv.getConfig
     miniBatch match {
       case MiniBatchOn =>
-        tableConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true)
-        tableConfig.getConfiguration.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+        tableConfig.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
         // trigger every record for easier harness test
-        tableConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 1L)
+        tableConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(1L))
         // disable local-global to only test the MiniBatchGroupAggFunction
-        tableConfig.getConfiguration.setString(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
+        tableConfig.set(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
       case MiniBatchOff =>
         tableConfig.getConfiguration.removeConfig(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY)
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index 6587af3..80b82d9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -310,8 +310,7 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
          |)
          |""".stripMargin)
 
-    tEnv.getConfig.getConfiguration.setString(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+    tEnv.getConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
 
     val sql =
       """
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index b5e55ae..2e8ee04 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -493,7 +493,7 @@ class CalcITCase extends StreamingTestBase {
     val tableId = TestValuesTableFactory.registerData(rows)
 
     // We need a fixed timezone to make sure this test can run on machines across the world
-    tEnv.getConfig.getConfiguration.setString("table.local-time-zone", "Europe/Berlin")
+    tEnv.getConfig.set("table.local-time-zone", "Europe/Berlin")
 
     tEnv.executeSql(s"""
                        |CREATE TABLE T (
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
index a5b3e88..7aea2ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
@@ -298,8 +298,8 @@ class ChangelogSourceITCase(
   }
 
   private def registerChangelogSourceWithEventsDuplicate(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, Boolean.box(true))
     val userChangelog: Seq[Row] = Seq(
       changelogRow("+I", "user1", "Tom", "tom@gmail.com", new JBigDecimal("10.02")),
       changelogRow("+I", "user2", "Jack", "jack@hotmail.com", new JBigDecimal("71.2")),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index 39dcd4e..479b1c2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -193,8 +193,9 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
   @Test
   def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
     Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED, true)
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED,
+      Boolean.box(true))
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
@@ -300,8 +301,9 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
   @Test
   def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
     Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED, true)
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED,
+      Boolean.box(true))
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 36b2d42..2df538e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -165,8 +165,8 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
 
   @Test
   def testMinMaxWithTumblingWindow(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
-    tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")
+    tEnv.getConfig.set("table.exec.emit.early-fire.enabled", "true")
+    tEnv.getConfig.set("table.exec.emit.early-fire.delay", "1000 ms")
 
     val sql =
       """
@@ -318,8 +318,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
       return
     }
     // wait 10 millisecond for late elements
-    tEnv.getConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofMillis(10))
+    tEnv.getConfig.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofMillis(10))
     // emit result without delay after watermark
     withLateFireDelay(tEnv.getConfig, Time.of(0, TimeUnit.NANOSECONDS))
     val data = List(
@@ -426,8 +425,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
   @Test
   def testWindowAggregateOnUpsertSourceWithAllowLateness(): Unit = {
     // wait 15 second for late elements
-    tEnv.getConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(15))
+    tEnv.getConfig.set(TABLE_EXEC_EMIT_ALLOW_LATENESS, Duration.ofSeconds(15))
     // emit result without delay after watermark
     withLateFireDelay(tEnv.getConfig, Time.of(0, TimeUnit.NANOSECONDS))
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
@@ -593,9 +591,8 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
       throw new RuntimeException(
         "Currently not support different lateFireInterval configs in one job")
     }
-    tableConfig.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
-    tableConfig.getConfiguration.set(
-      TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
+    tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, Boolean.box(true))
+    tableConfig.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
   }
 
   private def localDateTime(epochSecond: Long): LocalDateTime = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
index fcc8f2b..e18486a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
@@ -67,8 +67,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -96,8 +96,9 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig.set(
+      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED,
+      Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -125,8 +126,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -154,8 +155,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -189,8 +190,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
@@ -222,8 +223,8 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     tEnv.registerTable("a", da)
 
     val sink = new TestingRetractSink
-    tEnv.getConfig.getConfiguration.setBoolean(
-      InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
+    tEnv.getConfig
+      .set(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, Boolean.box(true))
     val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     results.addSink(sink).setParallelism(1)
     env.execute()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index e0042ba..1a5dba2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -56,11 +56,11 @@ class SplitAggregateITCase(
     super.before()
 
     if (partialAggMode.isPartialAggEnabled) {
-      tEnv.getConfig.getConfiguration.setBoolean(
-        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+      tEnv.getConfig
+        .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(true))
     } else {
-      tEnv.getConfig.getConfiguration.setBoolean(
-        OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, false)
+      tEnv.getConfig
+        .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, Boolean.box(false))
     }
 
     val data = List(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
index 72a4b16..8e6d148 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
@@ -702,12 +702,12 @@ class TemporalJoinITCase(state: StateBackendMode)
 
   @Test
   def testMiniBatchEventTimeViewTemporalJoin(): Unit = {
-    tEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    tEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s")
-    tEnv.getConfig.getConfiguration.setLong(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 4L)
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s")
+    tEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(4L))
 
     val sql = "INSERT INTO rowtime_default_sink " +
       " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time " +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index d0597d6..6b299f4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -168,7 +168,7 @@ class WindowAggregateITCase(
     tEnv.createFunction("concat_distinct_agg", classOf[ConcatDistinctAggFunction])
 
     tEnv.getConfig.setLocalTimeZone(SHANGHAI_ZONE)
-    tEnv.getConfig.getConfiguration.setString(
+    tEnv.getConfig.set(
       OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
       aggPhase.toString)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
index f7f1dc6..9e41638 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala
@@ -172,8 +172,9 @@ class WindowDistinctAggregateITCase(
          |)
          |""".stripMargin)
 
-    tEnv.getConfig.getConfiguration.setBoolean(
-      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, splitDistinct)
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED,
+      Boolean.box(splitDistinct))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index fe73d24..b52b2e0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -1253,7 +1253,8 @@ class TableSinkITCase extends StreamingTestBase {
     val t = env.fromCollection(tupleData3)
         .assignAscendingTimestamps(_._1.toLong)
         .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-    tEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE,
       ExecutionConfigOptions.SinkKeyedShuffle.FORCE)
 
     tEnv.executeSql(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 3146565..56e16cd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -518,9 +518,7 @@ object BatchTestBase {
   }
 
   def configForMiniCluster(tableConfig: TableConfig): Unit = {
-    tableConfig.
-      getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
-    tableConfig.getConfiguration
-      .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
+    tableConfig.set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(DEFAULT_PARALLELISM))
+    tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
index 02b42ac..ac8110c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithAggTestBase.scala
@@ -42,11 +42,11 @@ class StreamingWithAggTestBase(
     // in order to cover more code paths
     tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     if (aggMode.isLocalAggEnabled) {
-      tEnv.getConfig.getConfiguration.setString(
+      tEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.TWO_PHASE.toString)
     } else {
-      tEnv.getConfig.getConfiguration.setString(
+      tEnv.getConfig.set(
         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
         AggregatePhaseStrategy.ONE_PHASE.toString)
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index c75f571..6cb12cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1052,7 +1052,6 @@ abstract class TableTestUtil(
     TestingTableEnvironment.create(setting, catalogManager, tableConfig)
   val tableEnv: TableEnvironment = testingTableEnv
   tableEnv.getConfig
-    .getConfiguration
     .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
 
   private val env: StreamExecutionEnvironment = getPlanner.getExecEnv
@@ -1291,12 +1290,12 @@ case class StreamTableTestUtil(
   }
 
   def enableMiniBatch(): Unit = {
-    tableEnv.getConfig.getConfiguration.setBoolean(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-    tableEnv.getConfig.getConfiguration.set(
+    tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
+    tableEnv.getConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
-    tableEnv.getConfig.getConfiguration.setLong(
-      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 3L)
+    tableEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
   }
 
   def createAppendTableSink(

[flink] 01/02: [FLINK-26287][table] Add convenience method TableConfig.set

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d2e947361d6e413f28b2cccebeec2e1761bc253b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Feb 22 17:30:45 2022 +0100

    [FLINK-26287][table] Add convenience method TableConfig.set
---
 .../docs/dev/python/dependency_management.md       |  2 +-
 docs/content.zh/docs/dev/table/config.md           | 24 ++++----
 .../docs/dev/table/sql/queries/overview.md         | 12 ++--
 docs/content.zh/docs/dev/table/tuning.md           | 41 ++++++-------
 .../docs/dev/python/dependency_management.md       |  2 +-
 docs/content/docs/dev/table/config.md              | 24 ++++----
 .../content/docs/dev/table/sql/queries/overview.md |  8 +--
 docs/content/docs/dev/table/tuning.md              | 69 +++++++++++-----------
 flink-python/pyflink/table/table_config.py         |  9 +++
 .../org/apache/flink/table/api/TableConfig.java    | 49 +++++++++++++--
 10 files changed, 141 insertions(+), 99 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md
index 14ca3cd..c818686 100644
--- a/docs/content.zh/docs/dev/python/dependency_management.md
+++ b/docs/content.zh/docs/dev/python/dependency_management.md
@@ -302,7 +302,7 @@ import org.apache.flink.table.api.TableEnvironment;
 
 TableEnvironment tEnv = TableEnvironment.create(
     EnvironmentSettings.inBatchMode());
-tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1);
+tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
 
 // register the Python UDF
 tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md
index 2a4ac8f..70939a9 100644
--- a/docs/content.zh/docs/dev/table/config.md
+++ b/docs/content.zh/docs/dev/table/config.md
@@ -49,11 +49,11 @@ Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可
 TableEnvironment tEnv = ...
 
 // access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true");
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
-configuration.setString("table.exec.mini-batch.size", "5000");
+configuration.set("table.exec.mini-batch.enabled", "true");
+configuration.set("table.exec.mini-batch.allow-latency", "5 s");
+configuration.set("table.exec.mini-batch.size", "5000");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -62,11 +62,11 @@ configuration.setString("table.exec.mini-batch.size", "5000");
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true")
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
-configuration.setString("table.exec.mini-batch.size", "5000")
+configuration.set("table.exec.mini-batch.enabled", "true")
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -75,11 +75,11 @@ configuration.setString("table.exec.mini-batch.size", "5000")
 t_env = ...
 
 # access flink configuration
-configuration = t_env.get_config().get_configuration()
+configuration = t_env.get_config()
 # set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true")
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s")
-configuration.set_string("table.exec.mini-batch.size", "5000")
+configuration.set("table.exec.mini-batch.enabled", "true")
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
 ```
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
diff --git a/docs/content.zh/docs/dev/table/sql/queries/overview.md b/docs/content.zh/docs/dev/table/sql/queries/overview.md
index 4d54444..a1074a8 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/overview.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/overview.md
@@ -205,10 +205,10 @@ tableResult2.print();
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 val tableEnv = StreamTableEnvironment.create(env, settings)
 // enable checkpointing
-tableEnv.getConfig.getConfiguration.set(
-  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
-tableEnv.getConfig.getConfiguration.set(
-  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
+tableEnv.getConfig
+  .set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
+tableEnv.getConfig
+  .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
 
 tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
@@ -232,8 +232,8 @@ tableResult2.print()
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env, settings)
 # enable checkpointing
-table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
-table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().set("execution.checkpointing.interval", "10s")
 
 table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md
index 0031f5b..5d3111a 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -62,11 +62,11 @@ Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployme
 TableEnvironment tEnv = ...
 
 // access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
-configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -75,11 +75,11 @@ configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum nu
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
-configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -138,12 +138,12 @@ configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // e
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
-configuration.setString("table.exec.mini-batch.size", "5000")
-configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -208,9 +208,8 @@ GROUP BY day
 // instantiate table environment
 TableEnvironment tEnv = ...
 
-tEnv.getConfig()        // access high-level configuration
-  .getConfiguration()   // set low-level key-value options
-  .setString("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
+tEnv.getConfig()
+  .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -218,9 +217,8 @@ tEnv.getConfig()        // access high-level configuration
 // instantiate table environment
 val tEnv: TableEnvironment = ...
 
-tEnv.getConfig         // access high-level configuration
-  .getConfiguration    // set low-level key-value options
-  .setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split
+tEnv.getConfig
+  .set("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -228,9 +226,8 @@ tEnv.getConfig         // access high-level configuration
 # instantiate table environment
 t_env = ...
 
-t_env.get_config()        # access high-level configuration
-  .get_configuration()    # set low-level key-value options
-  .set_string("table.optimizer.distinct-agg.split.enabled", "true"); # enable distinct agg split
+t_env.get_config()
+  .set("table.optimizer.distinct-agg.split.enabled", "true"); # enable distinct agg split
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/python/dependency_management.md b/docs/content/docs/dev/python/dependency_management.md
index 7ed3c1f..e27ec80 100644
--- a/docs/content/docs/dev/python/dependency_management.md
+++ b/docs/content/docs/dev/python/dependency_management.md
@@ -302,7 +302,7 @@ import org.apache.flink.table.api.TableEnvironment;
 
 TableEnvironment tEnv = TableEnvironment.create(
     EnvironmentSettings.inBatchMode());
-tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1);
+tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
 
 // register the Python UDF
 tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
diff --git a/docs/content/docs/dev/table/config.md b/docs/content/docs/dev/table/config.md
index e6ebdd3..9cc928d 100644
--- a/docs/content/docs/dev/table/config.md
+++ b/docs/content/docs/dev/table/config.md
@@ -54,11 +54,11 @@ table environment.
 TableEnvironment tEnv = ...
 
 // access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true");
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
-configuration.setString("table.exec.mini-batch.size", "5000");
+configuration.set("table.exec.mini-batch.enabled", "true");
+configuration.set("table.exec.mini-batch.allow-latency", "5 s");
+configuration.set("table.exec.mini-batch.size", "5000");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -67,11 +67,11 @@ configuration.setString("table.exec.mini-batch.size", "5000");
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true")
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
-configuration.setString("table.exec.mini-batch.size", "5000")
+configuration.set("table.exec.mini-batch.enabled", "true")
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -80,11 +80,11 @@ configuration.setString("table.exec.mini-batch.size", "5000")
 t_env = ...
 
 # access flink configuration
-configuration = t_env.get_config().get_configuration()
+configuration = t_env.get_config()
 # set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true")
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s")
-configuration.set_string("table.exec.mini-batch.size", "5000")
+configuration.set("table.exec.mini-batch.enabled", "true")
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
 ```
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
diff --git a/docs/content/docs/dev/table/sql/queries/overview.md b/docs/content/docs/dev/table/sql/queries/overview.md
index 70cada6..d59fe90 100644
--- a/docs/content/docs/dev/table/sql/queries/overview.md
+++ b/docs/content/docs/dev/table/sql/queries/overview.md
@@ -205,9 +205,9 @@ tableResult2.print();
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 val tableEnv = StreamTableEnvironment.create(env, settings)
 // enable checkpointing
-tableEnv.getConfig.getConfiguration.set(
+tableEnv.getConfig.set(
   ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
-tableEnv.getConfig.getConfiguration.set(
+tableEnv.getConfig.set(
   ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
 
 tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
@@ -232,8 +232,8 @@ tableResult2.print()
 env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env, settings)
 # enable checkpointing
-table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
-table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().set("execution.checkpointing.interval", "10s")
 
 table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
diff --git a/docs/content/docs/dev/table/tuning.md b/docs/content/docs/dev/table/tuning.md
index b93a868..804e75f 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -62,11 +62,11 @@ The following examples show how to enable these options.
 TableEnvironment tEnv = ...
 
 // access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
-configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -75,11 +75,11 @@ configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum nu
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
-configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -88,11 +88,11 @@ configuration.setString("table.exec.mini-batch.size", "5000") // the maximum num
 t_env = ...
 
 # access flink configuration
-configuration = t_env.get_config().get_configuration();
+configuration = t_env.get_config();
 # set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true"); # enable mini-batch optimization
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s"); # use 5 seconds to buffer input records
-configuration.set_string("table.exec.mini-batch.size", "5000"); # the maximum number of records can be buffered by each aggregate operator task
+configuration.set("table.exec.mini-batch.enabled", "true"); # enable mini-batch optimization
+configuration.set("table.exec.mini-batch.allow-latency", "5 s"); # use 5 seconds to buffer input records
+configuration.set("table.exec.mini-batch.size", "5000"); # the maximum number of records can be buffered by each aggregate operator task
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -125,12 +125,12 @@ The following examples show how to enable the local-global aggregation.
 TableEnvironment tEnv = ...
 
 // access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
-configuration.setString("table.exec.mini-batch.size", "5000");
-configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s");
+configuration.set("table.exec.mini-batch.size", "5000");
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -139,12 +139,12 @@ configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // e
 val tEnv: TableEnvironment = ...
 
 // access flink configuration
-val configuration = tEnv.getConfig().getConfiguration()
+val configuration = tEnv.getConfig()
 // set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
-configuration.setString("table.exec.mini-batch.size", "5000")
-configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s")
+configuration.set("table.exec.mini-batch.size", "5000")
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -153,12 +153,12 @@ configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // en
 t_env = ...
 
 # access flink configuration
-configuration = t_env.get_config().get_configuration();
+configuration = t_env.get_config();
 # set low-level key-value options
-configuration.set_string("table.exec.mini-batch.enabled", "true"); # local-global aggregation depends on mini-batch is enabled
-configuration.set_string("table.exec.mini-batch.allow-latency", "5 s");
-configuration.set_string("table.exec.mini-batch.size", "5000");
-configuration.set_string("table.optimizer.agg-phase-strategy", "TWO_PHASE"); # enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true"); # local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s");
+configuration.set("table.exec.mini-batch.size", "5000");
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE"); # enable two-phase, i.e. local-global aggregation
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -212,9 +212,8 @@ The following examples show how to enable the split distinct aggregation optimiz
 // instantiate table environment
 TableEnvironment tEnv = ...
 
-tEnv.getConfig()        // access high-level configuration
-  .getConfiguration()   // set low-level key-value options
-  .setString("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
+tEnv.getConfig()
+  .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -222,9 +221,8 @@ tEnv.getConfig()        // access high-level configuration
 // instantiate table environment
 val tEnv: TableEnvironment = ...
 
-tEnv.getConfig         // access high-level configuration
-  .getConfiguration    // set low-level key-value options
-  .setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split
+tEnv.getConfig
+  .set("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -232,9 +230,8 @@ tEnv.getConfig         // access high-level configuration
 # instantiate table environment
 t_env = ...
 
-t_env.get_config()        # access high-level configuration
-  .get_configuration()    # set low-level key-value options
-  .set_string("table.optimizer.distinct-agg.split.enabled", "true"); # enable distinct agg split
+t_env.get_config()
+  .set("table.optimizer.distinct-agg.split.enabled", "true"); # enable distinct agg split
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index f4ebe6c..eacd635 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -51,6 +51,15 @@ class TableConfig(object):
         else:
             self._j_table_config = j_table_config
 
+    def set(self, key: str, value: str) -> 'TableConfig':
+        """
+        Sets a string-based value for the given string-based key.
+
+        The value will be parsed by the framework on access.
+        """
+        self._j_table_config.set(key, value)
+        return self
+
     def get_local_timezone(self) -> str:
         """
         Returns the local timezone id for timestamp with local time zone, either an abbreviation
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index ba0a2ac..2565bb4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.WritableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
@@ -61,11 +63,12 @@ import static java.time.ZoneId.SHORT_IDS;
  * <p>Note: Because options are read at different point in time when performing operations, it is
  * recommended to set configuration options early after instantiating a table environment.
  *
+ * @see TableConfigOptions
  * @see ExecutionConfigOptions
  * @see OptimizerConfigOptions
  */
 @PublicEvolving
-public class TableConfig {
+public class TableConfig implements WritableConfig {
     /** Defines if all fields need to be checked for NULL first. */
     private Boolean nullCheck = true;
 
@@ -81,6 +84,43 @@ public class TableConfig {
     /** A configuration object to hold all key/value configuration. */
     private final Configuration configuration = new Configuration();
 
+    /**
+     * Sets a value for the given {@link ConfigOption}.
+     *
+     * <p>This method should be preferred over {@link #set(String, String)} as it is type-safe,
+     * avoids unnecessary parsing of the value, and provides inline documentation.
+     *
+     * <p>Note: Scala users might need to convert the value into a boxed type. E.g. by using {@code
+     * Int.box(1)} or {@code Boolean.box(false)}.
+     *
+     * @see TableConfigOptions
+     * @see ExecutionConfigOptions
+     * @see OptimizerConfigOptions
+     */
+    @Override
+    public <T> TableConfig set(ConfigOption<T> option, T value) {
+        configuration.set(option, value);
+        return this;
+    }
+
+    /**
+     * Sets a string-based value for the given string-based key.
+     *
+     * <p>The value will be parsed by the framework on access.
+     *
+     * <p>This method exists for convenience when configuring a session with string-based
+     * properties. Use {@link #set(ConfigOption, Object)} for more type-safety and inline
+     * documentation.
+     *
+     * @see TableConfigOptions
+     * @see ExecutionConfigOptions
+     * @see OptimizerConfigOptions
+     */
+    public TableConfig set(String key, String value) {
+        configuration.setString(key, value);
+        return this;
+    }
+
     /** Gives direct access to the underlying key-value map for advanced configuration. */
     public Configuration getConfiguration() {
         return configuration;
@@ -105,8 +145,7 @@ public class TableConfig {
 
     /** Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default. */
     public void setSqlDialect(SqlDialect sqlDialect) {
-        getConfiguration()
-                .setString(TableConfigOptions.TABLE_SQL_DIALECT, sqlDialect.name().toLowerCase());
+        set(TableConfigOptions.TABLE_SQL_DIALECT, sqlDialect.name().toLowerCase());
     }
 
     /**
@@ -353,7 +392,7 @@ public class TableConfig {
      * <pre>{@code
      * Map<String, String> params = ...
      * TableConfig tableConfig = tEnv.getConfig;
-     * tableConfig.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
+     * config.set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
      * }</pre>
      */
     @Experimental
@@ -364,7 +403,7 @@ public class TableConfig {
                         .map(HashMap::new)
                         .orElseGet(HashMap::new);
         params.put(key, value);
-        getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
+        set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
     }
 
     public static TableConfig getDefault() {