You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/18 06:46:14 UTC

[flink] branch master updated (b8d8e07 -> 7fba24a)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b8d8e07  [hotfix][runtime] Makes use of static variable
     new 9fa08be  [hotfix][table-api-scala-bridge] Fix imports of StreamTableEnvironmentImpl
     new a924c67  [hotfix][table] Replace TableConfig constructor with getDefault()
     new 2eacc73  [FLINK-26688][table-planner] Remove usages of TableConfig.nullCheck
     new 7fba24a  [FLINK-26689][table] Replace `TableConfig` with `ReadableConfig`

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/connector/hbase1/HBaseTablePlanTest.java |   2 +-
 .../flink/connector/hbase2/HBaseTablePlanTest.java |   2 +-
 .../connector/jdbc/table/JdbcTablePlanTest.java    |   2 +-
 ...ghPythonStreamGroupWindowAggregateOperator.java |   3 +-
 ...owPythonGroupAggregateFunctionOperatorTest.java |   7 +-
 ...onGroupWindowAggregateFunctionOperatorTest.java |   7 +-
 ...honOverWindowAggregateFunctionOperatorTest.java |   7 +-
 ...onGroupWindowAggregateFunctionOperatorTest.java |   3 +-
 ...rrowPythonProcTimeBoundedRangeOperatorTest.java |   3 +-
 ...ArrowPythonProcTimeBoundedRowsOperatorTest.java |   3 +-
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |   3 +-
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |   3 +-
 .../scalar/PythonScalarFunctionOperatorTest.java   |   5 +-
 .../ArrowPythonScalarFunctionOperatorTest.java     |   5 +-
 .../table/PythonTableFunctionOperatorTest.java     |   3 +-
 .../internal/StreamTableEnvironmentImplTest.java   |   2 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   2 +-
 .../apache/flink/table/api/TableConfigTest.java    |   4 +-
 .../resolver/ExpressionResolverTest.java           |   2 +-
 .../utils/ValuesOperationTreeBuilderTest.java      |   2 +-
 .../flink/table/utils/ExpressionResolverMocks.java |   4 +-
 .../internal/StreamTableEnvironmentImpl.scala      |   2 +-
 .../internal/StreamTableEnvironmentImplTest.scala  |   2 +-
 .../abilities/source/WatermarkPushDownSpec.java    |   2 +-
 .../planner/plan/nodes/exec/ExecNodeConfig.java    |   2 -
 .../exec/batch/BatchExecBoundedStreamScan.java     |   2 +-
 .../plan/nodes/exec/batch/BatchExecExchange.java   |   5 +-
 .../nodes/exec/batch/BatchExecHashAggregate.java   |   2 +-
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   |  11 +-
 .../exec/batch/BatchExecHashWindowAggregate.java   |   2 +-
 .../exec/batch/BatchExecLegacyTableSourceScan.java |   2 +-
 .../nodes/exec/batch/BatchExecNestedLoopJoin.java  |   2 +-
 .../nodes/exec/batch/BatchExecOverAggregate.java   |  12 +-
 .../exec/batch/BatchExecPythonGroupAggregate.java  |   6 +-
 .../batch/BatchExecPythonGroupWindowAggregate.java |   6 +-
 .../exec/batch/BatchExecPythonOverAggregate.java   |   6 +-
 .../plan/nodes/exec/batch/BatchExecRank.java       |   4 +-
 .../plan/nodes/exec/batch/BatchExecSort.java       |   3 +-
 .../nodes/exec/batch/BatchExecSortAggregate.java   |   2 +-
 .../plan/nodes/exec/batch/BatchExecSortLimit.java  |   3 +-
 .../nodes/exec/batch/BatchExecSortMergeJoin.java   |   9 +-
 .../exec/batch/BatchExecSortWindowAggregate.java   |   2 +-
 .../plan/nodes/exec/common/CommonExecCalc.java     |   3 +-
 .../nodes/exec/common/CommonExecCorrelate.java     |   5 +-
 .../plan/nodes/exec/common/CommonExecExpand.java   |   2 +-
 .../nodes/exec/common/CommonExecLegacySink.java    |   2 +-
 .../nodes/exec/common/CommonExecLookupJoin.java    |  12 +-
 .../nodes/exec/common/CommonExecPythonCalc.java    |   6 +-
 .../exec/common/CommonExecPythonCorrelate.java     |   2 +-
 .../plan/nodes/exec/common/CommonExecValues.java   |   5 +-
 .../exec/stream/StreamExecDataStreamScan.java      |   2 +-
 .../stream/StreamExecGlobalGroupAggregate.java     |   2 +-
 .../stream/StreamExecGlobalWindowAggregate.java    |   2 +-
 .../exec/stream/StreamExecGroupAggregate.java      |   2 +-
 .../exec/stream/StreamExecGroupTableAggregate.java |   2 +-
 .../stream/StreamExecGroupWindowAggregate.java     |   2 +-
 .../StreamExecIncrementalGroupAggregate.java       |   2 +-
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |   2 +-
 .../plan/nodes/exec/stream/StreamExecJoin.java     |   3 +-
 .../stream/StreamExecLegacyTableSourceScan.java    |   2 +-
 .../exec/stream/StreamExecLocalGroupAggregate.java |   2 +-
 .../stream/StreamExecLocalWindowAggregate.java     |   2 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    |  20 ++--
 .../nodes/exec/stream/StreamExecOverAggregate.java |   2 +-
 .../StreamExecPythonGroupWindowAggregate.java      |   2 +-
 .../exec/stream/StreamExecPythonOverAggregate.java |   2 +-
 .../plan/nodes/exec/stream/StreamExecRank.java     |   2 +-
 .../plan/nodes/exec/stream/StreamExecSort.java     |   2 +-
 .../nodes/exec/stream/StreamExecTemporalJoin.java  |   2 +-
 .../nodes/exec/stream/StreamExecTemporalSort.java  |  10 +-
 .../exec/stream/StreamExecWatermarkAssigner.java   |   2 +-
 .../exec/stream/StreamExecWindowAggregate.java     |   2 +-
 .../nodes/exec/stream/StreamExecWindowJoin.java    |   3 +-
 .../nodes/exec/stream/StreamExecWindowRank.java    |   2 +-
 .../table/planner/plan/utils/KeySelectorUtil.java  |   4 +-
 .../table/planner/utils/TableConfigUtils.java      |  33 ++++++
 .../table/planner/codegen/CalcCodeGenerator.scala  |   5 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  49 +++-----
 .../planner/codegen/CodeGeneratorContext.scala     |  37 ++----
 .../planner/codegen/CollectorCodeGenerator.scala   |   4 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |   9 +-
 .../planner/codegen/EqualiserCodeGenerator.scala   |   6 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |  52 ++++-----
 .../table/planner/codegen/ExpressionReducer.scala  |   7 +-
 .../planner/codegen/FunctionCodeGenerator.scala    |   4 +-
 .../table/planner/codegen/GenerateUtils.scala      |  42 ++-----
 .../table/planner/codegen/HashCodeGenerator.scala  |   2 +-
 .../planner/codegen/InputFormatCodeGenerator.scala |   2 +-
 .../planner/codegen/LongHashJoinGenerator.scala    |   9 +-
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  20 ++--
 .../table/planner/codegen/MatchCodeGenerator.scala |  17 +--
 .../planner/codegen/OperatorCodeGenerator.scala    |   4 +-
 .../planner/codegen/ProjectionCodeGenerator.scala  |   2 +-
 .../planner/codegen/ValuesCodeGenerator.scala      |   4 +-
 .../codegen/WatermarkGeneratorCodeGenerator.scala  |  12 +-
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |   8 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   5 +-
 .../codegen/calls/BridgingSqlFunctionCallGen.scala |   2 +-
 .../planner/codegen/calls/FunctionGenerator.scala  |  13 ++-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 126 +++++----------------
 .../planner/codegen/calls/SearchOperatorGen.scala  |  21 +---
 ...ltiFieldRangeBoundComparatorCodeGenerator.scala |   6 +-
 .../over/RangeBoundComparatorCodeGenerator.scala   |   6 +-
 .../codegen/sort/ComparatorCodeGenerator.scala     |   6 +-
 .../planner/codegen/sort/SortCodeGenerator.scala   |   6 +-
 .../physical/batch/BatchPhysicalJoinBase.scala     |   2 +-
 ...ushPartitionIntoLegacyTableSourceScanRule.scala |   8 +-
 .../flink/table/planner/plan/utils/JoinUtil.scala  |  13 ++-
 .../table/planner/plan/utils/PartitionPruner.scala |   4 +-
 .../planner/catalog/JavaCatalogTableTest.java      |   4 +-
 .../flink/table/planner/codegen/CodeSplitTest.java |  17 +--
 .../planner/codegen/LongHashJoinGeneratorTest.java |   4 +-
 .../planner/codegen/SortCodeGeneratorTest.java     |   5 +-
 .../table/planner/delegation/ParserImplTest.java   |   2 +-
 .../converter/ExpressionConverterTest.java         |   2 +-
 .../operations/SqlToOperationConverterTest.java    |   2 +-
 .../MultipleInputNodeCreationProcessorTest.java    |   4 +-
 .../serde/TemporalTableSourceSpecSerdeTest.java    |   2 +-
 .../ProjectWatermarkAssignerTransposeRuleTest.java |   2 +-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  |   2 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.java   |   2 +-
 .../flink/table/planner/utils/PlannerMocks.java    |   4 +-
 .../planner/codegen/HashCodeGeneratorTest.scala    |   6 +-
 .../codegen/ProjectionCodeGeneratorTest.scala      |  12 +-
 .../codegen/WatermarkGeneratorCodeGenTest.scala    |  26 ++---
 .../table/planner/codegen/agg/AggTestBase.scala    |   4 +-
 .../expressions/utils/ExpressionTestBase.scala     |   2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |   6 +-
 .../metadata/AggCallSelectivityEstimatorTest.scala |   2 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   2 +-
 .../plan/nodes/calcite/RelNodeTestBase.scala       |   2 +-
 .../planner/plan/utils/PartitionPrunerTest.scala   |  12 +-
 .../planner/runtime/batch/table/JoinITCase.scala   |   2 -
 .../planner/runtime/utils/BatchTestBase.scala      |   2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |   8 +-
 .../generated/GeneratedAggsHandleFunction.java     |   3 +-
 .../runtime/generated/GeneratedCollector.java      |   3 +-
 .../table/runtime/generated/GeneratedFunction.java |   3 +-
 .../runtime/generated/GeneratedHashFunction.java   |   4 +-
 .../table/runtime/generated/GeneratedInput.java    |   4 +-
 .../runtime/generated/GeneratedJoinCondition.java  |   3 +-
 .../GeneratedNamespaceAggsHandleFunction.java      |   3 +-
 .../GeneratedNamespaceTableAggsHandleFunction.java |   3 +-
 .../table/runtime/generated/GeneratedOperator.java |   4 +-
 .../runtime/generated/GeneratedProjection.java     |   3 +-
 .../generated/GeneratedRecordComparator.java       |   3 +-
 .../generated/GeneratedRecordEqualiser.java        |   3 +-
 .../runtime/generated/GeneratedResultFuture.java   |   3 +-
 .../GeneratedTableAggsHandleFunction.java          |   4 +-
 .../generated/GeneratedWatermarkGenerator.java     |   3 +-
 150 files changed, 435 insertions(+), 576 deletions(-)

[flink] 01/04: [hotfix][table-api-scala-bridge] Fix imports of StreamTableEnvironmentImpl

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fa08beac5d381fd48df81f80170fef7db42d7ee
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Mar 15 18:54:24 2022 +0200

    [hotfix][table-api-scala-bridge] Fix imports of StreamTableEnvironmentImpl
---
 .../table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index cbc4f38..d8bca7c 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.api.bridge.scala.internal
 
-import org.apache.flink.annotation.{Internal, VisibleForTesting}
+import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic

[flink] 02/04: [hotfix][table] Replace TableConfig constructor with getDefault()

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a924c6757240c155c5b0e6cf9ecb511d52d63e60
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Mar 15 17:53:23 2022 +0200

    [hotfix][table] Replace TableConfig constructor with getDefault()
---
 .../org/apache/flink/connector/hbase1/HBaseTablePlanTest.java     | 2 +-
 .../org/apache/flink/connector/hbase2/HBaseTablePlanTest.java     | 2 +-
 .../org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java  | 2 +-
 .../api/bridge/java/internal/StreamTableEnvironmentImplTest.java  | 2 +-
 .../org/apache/flink/table/api/internal/TableEnvironmentImpl.java | 2 +-
 .../src/test/java/org/apache/flink/table/api/TableConfigTest.java | 4 ++--
 .../flink/table/expressions/resolver/ExpressionResolverTest.java  | 2 +-
 .../table/operations/utils/ValuesOperationTreeBuilderTest.java    | 2 +-
 .../org/apache/flink/table/utils/ExpressionResolverMocks.java     | 4 ++--
 .../bridge/scala/internal/StreamTableEnvironmentImplTest.scala    | 2 +-
 .../apache/flink/table/planner/catalog/JavaCatalogTableTest.java  | 4 ++--
 .../org/apache/flink/table/planner/delegation/ParserImplTest.java | 2 +-
 .../planner/expressions/converter/ExpressionConverterTest.java    | 2 +-
 .../table/planner/operations/SqlToOperationConverterTest.java     | 2 +-
 .../exec/processor/MultipleInputNodeCreationProcessorTest.java    | 4 ++--
 .../plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java   | 2 +-
 .../rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java  | 2 +-
 .../rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java   | 2 +-
 .../physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java   | 2 +-
 .../java/org/apache/flink/table/planner/utils/PlannerMocks.java   | 4 ++--
 .../table/planner/codegen/WatermarkGeneratorCodeGenTest.scala     | 2 +-
 .../planner/plan/metadata/AggCallSelectivityEstimatorTest.scala   | 2 +-
 .../table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala   | 2 +-
 .../flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala  | 2 +-
 .../flink/table/planner/plan/utils/PartitionPrunerTest.scala      | 4 ++--
 .../apache/flink/table/planner/runtime/utils/BatchTestBase.scala  | 2 +-
 .../org/apache/flink/table/planner/utils/TableTestBase.scala      | 8 ++++----
 27 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseTablePlanTest.java
index df44c50..f6e1ec1 100644
--- a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseTablePlanTest.java
+++ b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseTablePlanTest.java
@@ -29,7 +29,7 @@ import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 /** Plan tests for HBase connector, for example, testing projection push down. */
 public class HBaseTablePlanTest extends TableTestBase {
 
-    private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Test
     public void testMultipleRowKey() {
diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseTablePlanTest.java
index 979a6d6..048e08d 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseTablePlanTest.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseTablePlanTest.java
@@ -29,7 +29,7 @@ import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 /** Plan tests for HBase connector, for example, testing projection push down. */
 public class HBaseTablePlanTest extends TableTestBase {
 
-    private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Test
     public void testMultipleRowKey() {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index 2e02f76..642fafe 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 /** Plan tests for JDBC connector, for example, testing projection push down. */
 public class JdbcTablePlanTest extends TableTestBase {
 
-    private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Before
     public void setup() {
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index ce90c50..304843e 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -74,7 +74,7 @@ public class StreamTableEnvironmentImplTest {
 
     private StreamTableEnvironmentImpl getStreamTableEnvironment(
             StreamExecutionEnvironment env, DataStreamSource<Integer> elements) {
-        TableConfig tableConfig = new TableConfig();
+        TableConfig tableConfig = TableConfig.getDefault();
         CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
         ModuleManager moduleManager = new ModuleManager();
         return new StreamTableEnvironmentImpl(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index a692e55..7f3ed73 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -278,7 +278,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         final Executor executor = executorFactory.create(settings.getConfiguration());
 
         // use configuration to init table config
-        final TableConfig tableConfig = new TableConfig();
+        final TableConfig tableConfig = TableConfig.getDefault();
         tableConfig.setRootConfiguration(executor.getConfiguration());
         tableConfig.addConfiguration(settings.getConfiguration());
 
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
index c3a3086..5db7a75 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
@@ -34,8 +34,8 @@ public class TableConfigTest {
 
     @Rule public ExpectedException expectedException = ExpectedException.none();
 
-    private static TableConfig configByMethod = new TableConfig();
-    private static TableConfig configByConfiguration = new TableConfig();
+    private static TableConfig configByMethod = TableConfig.getDefault();
+    private static TableConfig configByConfiguration = TableConfig.getDefault();
     private static Configuration configuration = new Configuration();
 
     @Test
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index d2f5f0a..0acd39b 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -350,7 +350,7 @@ public class ExpressionResolverTest {
 
         public ExpressionResolver getResolver() {
             return ExpressionResolver.resolverFor(
-                            new TableConfig(),
+                            TableConfig.getDefault(),
                             name -> Optional.empty(),
                             new FunctionLookupMock(functions),
                             new DataTypeFactoryMock(),
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
index 25e1aef..51b56d7 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
@@ -485,7 +485,7 @@ public class ValuesOperationTreeBuilderTest {
 
         public OperationTreeBuilder getTreeBuilder() {
             return OperationTreeBuilder.create(
-                    new TableConfig(),
+                    TableConfig.getDefault(),
                     new FunctionLookupMock(Collections.emptyMap()),
                     new DataTypeFactoryMock(),
                     name -> Optional.empty(), // do not support
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
index 68970fe..3e4ecb4 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
@@ -38,7 +38,7 @@ public final class ExpressionResolverMocks {
 
     public static ExpressionResolverBuilder forSqlExpression(SqlExpressionResolver resolver) {
         return ExpressionResolver.resolverFor(
-                new TableConfig(),
+                TableConfig.getDefault(),
                 name -> Optional.empty(),
                 new FunctionLookupMock(Collections.emptyMap()),
                 new DataTypeFactoryMock(),
@@ -55,7 +55,7 @@ public final class ExpressionResolverMocks {
     public static ExpressionResolverBuilder basicResolver(
             CatalogManager catalogManager, FunctionCatalog functionCatalog, Parser parser) {
         return ExpressionResolver.resolverFor(
-                new TableConfig(),
+                TableConfig.getDefault(),
                 name -> Optional.empty(),
                 functionCatalog.asLookup(parser::parseIdentifier),
                 catalogManager.getDataTypeFactory(),
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index b33c977..0a0b848 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -78,7 +78,7 @@ class StreamTableEnvironmentImplTest {
   private def getStreamTableEnvironment(
       env: StreamExecutionEnvironment,
       elements: DataStream[Int]) = {
-    val config = new TableConfig
+    val config = TableConfig.getDefault
     val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
     val moduleManager = new ModuleManager
     new StreamTableEnvironmentImpl(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
index 875a9b9..1bec0f7 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
@@ -60,9 +60,9 @@ public class JavaCatalogTableTest extends TableTestBase {
 
     private TableTestUtil getTestUtil() {
         if (isStreamingMode) {
-            return streamTestUtil(new TableConfig());
+            return streamTestUtil(TableConfig.getDefault());
         } else {
-            return batchTestUtil(new TableConfig());
+            return batchTestUtil(TableConfig.getDefault());
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
index e43f96d..70ca666 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
@@ -50,7 +50,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 public class ParserImplTest {
 
     private final boolean isStreamingMode = false;
-    private final TableConfig tableConfig = new TableConfig();
+    private final TableConfig tableConfig = TableConfig.getDefault();
     private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
     private final CatalogManager catalogManager =
             CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
index 968c578..5bd0082 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
@@ -56,7 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for {@link ExpressionConverter}. */
 public class ExpressionConverterTest {
 
-    private final TableConfig tableConfig = new TableConfig();
+    private final TableConfig tableConfig = TableConfig.getDefault();
     private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
     private final ModuleManager moduleManager = new ModuleManager();
     private final PlannerContext plannerContext =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index aa23774..f313ba3 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -135,7 +135,7 @@ import static org.assertj.core.api.InstanceOfAssertFactories.type;
 /** Test cases for {@link SqlToOperationConverter}. */
 public class SqlToOperationConverterTest {
     private final boolean isStreamingMode = false;
-    private final TableConfig tableConfig = new TableConfig();
+    private final TableConfig tableConfig = TableConfig.getDefault();
     private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
     private final CatalogManager catalogManager =
             CatalogManagerMocks.preparedCatalogManager()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 18f3a95..078a7f5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -49,8 +49,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link MultipleInputNodeCreationProcessor}. */
 public class MultipleInputNodeCreationProcessorTest extends TableTestBase {
 
-    private final BatchTableTestUtil batchUtil = batchTestUtil(new TableConfig());
-    private final StreamTableTestUtil streamUtil = streamTestUtil(new TableConfig());
+    private final BatchTableTestUtil batchUtil = batchTestUtil(TableConfig.getDefault());
+    private final StreamTableTestUtil streamUtil = streamTestUtil(TableConfig.getDefault());
 
     @Test
     public void testIsChainableDataStreamSource() {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
index 929826d..95f7abc 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
@@ -121,7 +121,7 @@ public class TemporalTableSourceSpecSerdeTest {
                 false);
 
         SerdeContext serdeCtx =
-                JsonSerdeTestUtil.configuredSerdeContext(catalogManager, new TableConfig());
+                JsonSerdeTestUtil.configuredSerdeContext(catalogManager, TableConfig.getDefault());
 
         String json = JsonSerdeTestUtil.toJson(serdeCtx, spec);
         TemporalTableSourceSpec actual =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java
index 69d52dc..4da8871 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java
@@ -34,7 +34,7 @@ import org.junit.Test;
 
 /** Test for {@link ProjectWatermarkAssignerTransposeRule}. */
 public class ProjectWatermarkAssignerTransposeRuleTest extends TableTestBase {
-    private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Before
     public void setup() {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index 58215cb..cdef530 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -48,7 +48,7 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
  * PushWatermarkIntoTableSourceScanRule}.
  */
 public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
-    private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Before
     public void setup() {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
index 51aee49..5b72b50 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java
@@ -34,7 +34,7 @@ import org.junit.Test;
  * into table source.
  */
 public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase {
-    protected BatchTableTestUtil util = batchTestUtil(new TableConfig());
+    protected BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
 
     @Before
     public void setup() {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index 5e26728..dccda76 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -128,11 +128,11 @@ public class PlannerMocks {
     }
 
     public static PlannerMocks create() {
-        return new PlannerMocks(new TableConfig());
+        return new PlannerMocks(TableConfig.getDefault());
     }
 
     public static PlannerMocks create(Configuration configuration) {
-        TableConfig tableConfig = new TableConfig();
+        TableConfig tableConfig = TableConfig.getDefault();
         tableConfig.addConfiguration(configuration);
         return new PlannerMocks(tableConfig);
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 3dbb790..6609ca6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -57,7 +57,7 @@ import org.junit.Test
 class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {
 
   // mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor.
-  val config = new TableConfig
+  val config = TableConfig.getDefault
   val moduleManager = new ModuleManager
   val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
   val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index 93f3db5..1dda598 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -77,7 +77,7 @@ class AggCallSelectivityEstimatorTest {
 
   private def mockScan(
       statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): TableScan = {
-    val tableConfig = new TableConfig
+    val tableConfig = TableConfig.getDefault
     val moduleManager = new ModuleManager
     val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
     val rootSchema = CalciteSchema.createRootSchema(true, false).plus()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index ef9db28..b0f5e25 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -80,7 +80,7 @@ import scala.collection.JavaConversions._
 
 class FlinkRelMdHandlerTestBase {
 
-  val tableConfig = new TableConfig()
+  val tableConfig = TableConfig.getDefault()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
 
   val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala
index 743ed16..63ca040 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala
@@ -45,7 +45,7 @@ import java.util
  * TODO refactor the metadata test to extract the common logic for all related tests.
  */
 class RelNodeTestBase {
-  val tableConfig = new TableConfig()
+  val tableConfig = TableConfig.getDefault()
   val rootSchema: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
   val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
   val moduleManager = new ModuleManager
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
index fee5f0c..1fbdf3e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
@@ -106,7 +106,7 @@ class PartitionPrunerTest extends RexNodeTestBase {
       Map("amount" -> "200", "name" -> "Test3").asJava
     ).asJava
 
-    val config = new TableConfig
+    val config = TableConfig.getDefault
     val prunedPartitions = PartitionPruner.prunePartitions(
       config,
       partitionFieldNames,
@@ -176,7 +176,7 @@ class PartitionPrunerTest extends RexNodeTestBase {
         "f3" -> "2018-08-06 12:08:06.124").asJava
     ).asJava
 
-    val config = new TableConfig
+    val config = TableConfig.getDefault
     config.setLocalTimeZone(ZoneOffset.ofHours(0))
     val prunedPartitions = PartitionPruner.prunePartitions(
       config,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 56e16cd..e2f38e2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -62,7 +62,7 @@ class BatchTestBase extends BatchAbstractTestBase {
 
   private val settings = EnvironmentSettings.newInstance().inBatchMode().build()
   private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
-    .create(settings, catalogManager = None, new TableConfig)
+    .create(settings, catalogManager = None, TableConfig.getDefault)
   val tEnv: TableEnvironment = testingTableEnv
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val env: StreamExecutionEnvironment = planner.getExecEnv
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index cb56447..5cc75d9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -105,14 +105,14 @@ abstract class TableTestBase {
   @Rule
   def name: TestName = testName
 
-  def streamTestUtil(conf: TableConfig = new TableConfig): StreamTableTestUtil =
+  def streamTestUtil(conf: TableConfig = TableConfig.getDefault): StreamTableTestUtil =
     StreamTableTestUtil(this, conf = conf)
 
   def scalaStreamTestUtil(): ScalaStreamTableTestUtil = ScalaStreamTableTestUtil(this)
 
   def javaStreamTestUtil(): JavaStreamTableTestUtil = JavaStreamTableTestUtil(this)
 
-  def batchTestUtil(conf: TableConfig = new TableConfig): BatchTableTestUtil =
+  def batchTestUtil(conf: TableConfig = TableConfig.getDefault): BatchTableTestUtil =
     BatchTableTestUtil(this, conf = conf)
 
   def scalaBatchTestUtil(): ScalaBatchTableTestUtil = ScalaBatchTableTestUtil(this)
@@ -1222,7 +1222,7 @@ abstract class JavaTableTestUtil(
 case class StreamTableTestUtil(
     test: TableTestBase,
     catalogManager: Option[CatalogManager] = None,
-    conf: TableConfig = new TableConfig)
+    conf: TableConfig = TableConfig.getDefault)
   extends TableTestUtil(test, isStreamingMode = true, catalogManager, conf) {
 
   /**
@@ -1342,7 +1342,7 @@ case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUti
 case class BatchTableTestUtil(
     test: TableTestBase,
     catalogManager: Option[CatalogManager] = None,
-    conf: TableConfig = new TableConfig)
+    conf: TableConfig = TableConfig.getDefault)
   extends TableTestUtil(test, isStreamingMode = false, catalogManager, conf) {
 
   def buildBatchProgram(firstProgramNameToRemove: String): Unit = {

[flink] 03/04: [FLINK-26688][table-planner] Remove usages of TableConfig.nullCheck

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2eacc7373aa0fca2f2c59a72b0affb0ebda49ddf
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Wed Mar 16 12:52:46 2022 +0200

    [FLINK-26688][table-planner] Remove usages of TableConfig.nullCheck
    
    Remove usages of `TableConfig`s `nullCheck` which is used in code
    generation, since it was not actually used so far the option itself
    has been deprecated and planned to be removed in next releases.
    
    With this change, we can replace the necessicity top pass around
    `TableConfig` instead of `ReadableConfig` in various places.
---
 .../planner/plan/nodes/exec/ExecNodeConfig.java    |   2 -
 .../flink/table/planner/codegen/CodeGenUtils.scala |  49 ++++-----
 .../planner/codegen/CodeGeneratorContext.scala     |  19 +---
 .../table/planner/codegen/ExprCodeGenerator.scala  |  48 ++++-----
 .../table/planner/codegen/GenerateUtils.scala      |  42 ++------
 .../table/planner/codegen/MatchCodeGenerator.scala |  11 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   5 +-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 118 +++++----------------
 .../planner/codegen/calls/SearchOperatorGen.scala  |  21 +---
 .../planner/runtime/batch/table/JoinITCase.scala   |   2 -
 10 files changed, 82 insertions(+), 235 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
index 183ce14..2d238e1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
@@ -56,8 +56,6 @@ public final class ExecNodeConfig implements ReadableConfig {
         this.nodeConfig = nodeConfig;
         this.originalTableConfig = tableConfig;
         this.tableConfig = TableConfig.getDefault();
-        this.tableConfig.setNullCheck(tableConfig.getNullCheck());
-        this.tableConfig.setDecimalContext(tableConfig.getDecimalContext());
         this.tableConfig.addConfiguration(tableConfig.getConfiguration());
         this.tableConfig.addConfiguration((Configuration) nodeConfig);
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 683b231..21ead75 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -491,40 +491,27 @@ object CodeGenUtils {
         case Some(writer) =>
           // use writer to set field
           val writeField = binaryWriterWriteField(ctx, indexTerm, fieldTerm, writer, fieldType)
-          if (ctx.nullCheck) {
-            s"""
-               |${fieldExpr.code}
-               |if (${fieldExpr.nullTerm}) {
-               |  ${binaryWriterWriteNull(indexTerm, writer, fieldType)};
-               |} else {
-               |  $writeField;
-               |}
-             """.stripMargin
-          } else {
-            s"""
-               |${fieldExpr.code}
-               |$writeField;
-             """.stripMargin
-          }
+          s"""
+             |${fieldExpr.code}
+             |if (${fieldExpr.nullTerm}) {
+             |  ${binaryWriterWriteNull(indexTerm, writer, fieldType)};
+             |} else {
+             |  $writeField;
+             |}
+           """.stripMargin
 
         case None =>
           // directly set field to BinaryRowData, this depends on all the fields are fixed length
           val writeField = binaryRowFieldSetAccess(indexTerm, rowTerm, fieldType, fieldTerm)
-          if (ctx.nullCheck) {
-            s"""
-               |${fieldExpr.code}
-               |if (${fieldExpr.nullTerm}) {
-               |  ${binaryRowSetNull(indexTerm, rowTerm, fieldType)};
-               |} else {
-               |  $writeField;
-               |}
-             """.stripMargin
-          } else {
-            s"""
-               |${fieldExpr.code}
-               |$writeField;
-             """.stripMargin
-          }
+
+          s"""
+             |${fieldExpr.code}
+             |if (${fieldExpr.nullTerm}) {
+             |  ${binaryRowSetNull(indexTerm, rowTerm, fieldType)};
+             |} else {
+             |  $writeField;
+             |}
+           """.stripMargin
       }
     } else if (rowClass == classOf[GenericRowData] || rowClass == classOf[BoxedWrapperRowData]) {
       val writeField = if (rowClass == classOf[GenericRowData]) {
@@ -538,7 +525,7 @@ object CodeGenUtils {
         s"$rowTerm.setNullAt($indexTerm)"
       }
 
-      if (ctx.nullCheck) {
+      if (fieldType.isNullable) {
         s"""
            |${fieldExpr.code}
            |if (${fieldExpr.nullTerm}) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index f552958..c5f6f14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -145,9 +145,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   def getReusableInputUnboxingExprs(inputTerm: String, index: Int): Option[GeneratedExpression] =
     reusableInputUnboxingExprs.get((inputTerm, index))
 
-  def nullCheck: Boolean = tableConfig.getNullCheck
-
-
   /**
     * Add a line comment to [[reusableHeaderComments]] list which will be concatenated
     * into a single class header comment.
@@ -677,7 +674,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |""".stripMargin
 
     val fieldInit = seedExpr match {
-      case Some(s) if nullCheck =>
+      case Some(s) =>
         s"""
            |${s.code}
            |if (!${s.nullTerm}) {
@@ -687,11 +684,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
            |  $fieldTerm = new java.util.Random();
            |}
            |""".stripMargin
-      case Some(s) =>
-        s"""
-           |${s.code}
-           |$fieldTerm = new java.util.Random(${s.resultTerm});
-           |""".stripMargin
       case _ =>
         s"""
            |$fieldTerm = new java.util.Random();
@@ -968,7 +960,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |  throw new RuntimeException("Unsupported algorithm.");
          |}
          |""".stripMargin
-    val nullableInit = if (nullCheck) {
+    val nullableInit =
       s"""
          |${constant.code}
          |if (${constant.nullTerm}) {
@@ -977,12 +969,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
          |  $init
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${constant.code}
-         |$init
-         |""".stripMargin
-    }
+
     reusableInitStatements.add(nullableInit)
 
     fieldTerm
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3c3f858..38386e9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -58,11 +58,6 @@ import scala.collection.JavaConversions._
 class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
   extends RexVisitor[GeneratedExpression] {
 
-  // check if nullCheck is enabled when inputs can be null
-  if (nullableInput && !ctx.nullCheck) {
-    throw new CodeGenException("Null check must be enabled if entire rows can be null.")
-  }
-
   /**
     * term of the [[ProcessFunction]]'s context, can be changed when needed
     */
@@ -191,8 +186,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         // attribute is proctime indicator.
         // we use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
       case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
         // attribute is proctime field in a batch query.
         // it is initialized with the current time.
@@ -216,7 +210,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
           input2Term.get,
           idx,
           nullableInput,
-          ctx.nullCheck)
+          true)
         ).toSeq
       case None => Seq() // add nothing
     }
@@ -330,7 +324,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
 
     val code = if (returnTypeClazz == classOf[BinaryRowData] && outRowWriter.isDefined) {
       val writer = outRowWriter.get
-      val resetWriter = if (ctx.nullCheck) s"$writer.reset();" else s"$writer.resetCursor();"
+      val resetWriter = s"$writer.reset();"
       val completeWriter: String = s"$writer.complete();"
       s"""
          |$outRowInitCode
@@ -366,7 +360,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       inputRef.getIndex - input1Arity
     }
 
-    generateInputAccess(ctx, input._1, input._2, index, nullableInput, ctx.nullCheck)
+    generateInputAccess(ctx, input._1, input._2, index, nullableInput, true)
   }
 
   override def visitTableInputRef(rexTableInputRef: RexTableInputRef): GeneratedExpression =
@@ -381,13 +375,15 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       refExpr.resultTerm,
       index)
 
-    val resultTypeTerm = primitiveTypeTermForType(fieldAccessExpr.resultType)
-    val defaultValue = primitiveDefaultValue(fieldAccessExpr.resultType)
+    val resultType = fieldAccessExpr.resultType
+
+    val resultTypeTerm = primitiveTypeTermForType(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
     val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
       (resultTypeTerm, "result"),
       ("boolean", "isNull"))
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode =
       s"""
          |${refExpr.code}
          |if (${refExpr.nullTerm}) {
@@ -400,13 +396,6 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
          |  $nullTerm = ${fieldAccessExpr.nullTerm};
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${refExpr.code}
-         |${fieldAccessExpr.code}
-         |$resultTerm = ${fieldAccessExpr.resultTerm};
-         |""".stripMargin
-    }
 
     GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
   }
@@ -479,7 +468,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case (operandLiteral: RexLiteral, 0) if
       operandLiteral.getType.getSqlTypeName == SqlTypeName.NULL &&
         call.getOperator.getReturnTypeInference == ReturnTypes.ARG0 =>
-        generateNullLiteral(resultType, ctx.nullCheck)
+        generateNullLiteral(resultType)
 
       case (o@_, _) => o.accept(this)
     }
@@ -627,25 +616,25 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
 
       case IS_NULL =>
         val operand = operands.head
-        generateIsNull(ctx, operand)
+        generateIsNull(operand)
 
       case IS_NOT_NULL =>
         val operand = operands.head
-        generateIsNotNull(ctx, operand)
+        generateIsNotNull(operand)
 
       // logic
       case AND =>
         operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
           requireBoolean(left)
           requireBoolean(right)
-          generateAnd(ctx, left, right)
+          generateAnd(left, right)
         }
 
       case OR =>
         operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
           requireBoolean(left)
           requireBoolean(right)
-          generateOr(ctx, left, right)
+          generateOr(left, right)
         }
 
       case NOT =>
@@ -711,7 +700,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
             val array = operands.head
             val index = operands(1)
             requireInteger(index)
-            generateArrayElementAt(ctx, array, index)
+            generateArrayElementAt(array, index)
 
           case LogicalTypeRoot.MAP =>
             val key = operands(1)
@@ -739,7 +728,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case ELEMENT =>
         val array = operands.head
         requireArray(array)
-        generateArrayElement(ctx, array)
+        generateArrayElement(array)
 
       case DOT =>
         generateDot(ctx, operands)
@@ -748,8 +737,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
         // attribute is proctime indicator.
         // We use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
 
       case PROCTIME_MATERIALIZE =>
         generateProctimeTimestamp(ctx, contextTerm)
@@ -764,7 +752,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case JSON_ARRAY => new JsonArrayCallGen(call).generate(ctx, operands, resultType)
 
       case _: SqlThrowExceptionFunction =>
-        val nullValue = generateNullLiteral(resultType, nullCheck = true)
+        val nullValue = generateNullLiteral(resultType)
         val code =
           s"""
              |${operands.map(_.code).mkString("\n")}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 5c6018d..dbc8d9a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -82,7 +82,7 @@ object GenerateUtils {
     val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
     val defaultValue = primitiveDefaultValue(returnType)
     val isResultNullable = resultNullable || (isReference(returnType) && !isTemporal(returnType))
-    val nullTermCode = if (ctx.nullCheck && isResultNullable) {
+    val nullTermCode = if (isResultNullable) {
       s"$nullTerm = ($resultTerm == null);"
     } else {
       ""
@@ -107,7 +107,7 @@ object GenerateUtils {
          |""".stripMargin
     }
 
-    val resultCode = if (ctx.nullCheck && operands.nonEmpty) {
+    val resultCode = if (operands.nonEmpty) {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
@@ -117,19 +117,13 @@ object GenerateUtils {
          |  $nullTermCode
          |}
          |""".stripMargin
-    } else if (ctx.nullCheck && operands.isEmpty) {
+    } else {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$nullTerm = false;
          |$wrappedResultAssignment
          |$nullTermCode
          |""".stripMargin
-    } else {
-      s"""
-         |$nullTerm = false;
-         |${operands.map(_.code).mkString("\n")}
-         |$wrappedResultAssignment
-         |""".stripMargin
     }
 
     GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
@@ -169,7 +163,7 @@ object GenerateUtils {
     val nullTerm = ctx.addReusableLocalVariable("boolean", "isNull")
     val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
     val isResultNullable = resultNullable || (isReference(returnType) && !isTemporal(returnType))
-    val nullTermCode = if (ctx.nullCheck && isResultNullable) {
+    val nullTermCode = if (isResultNullable) {
       s"$nullTerm = ($resultTerm == null);"
     } else {
       s"$nullTerm = false;"
@@ -199,7 +193,7 @@ object GenerateUtils {
          |""".stripMargin
     }
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode = if (resultNullable) {
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$wrappedResultAssignment
@@ -216,7 +210,6 @@ object GenerateUtils {
        """.stripMargin
     }
 
-
     GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
   }
 
@@ -277,12 +270,7 @@ object GenerateUtils {
       s"$recordTerm = new $typeTerm();"
   }
 
-  def generateNullLiteral(
-      resultType: LogicalType,
-      nullCheck: Boolean): GeneratedExpression = {
-    if (!nullCheck) {
-      throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
-    }
+  def generateNullLiteral(resultType: LogicalType): GeneratedExpression = {
     val defaultValue = primitiveDefaultValue(resultType)
     val resultTypeTerm = primitiveTypeTermForType(resultType)
     GeneratedExpression(
@@ -318,7 +306,7 @@ object GenerateUtils {
       literalValue: Any,
       literalType: LogicalType): GeneratedExpression = {
     if (literalValue == null) {
-      return generateNullLiteral(literalType, ctx.nullCheck)
+      return generateNullLiteral(literalType)
     }
     literalType.getTypeRoot match {
       // For strings, binary and decimal, we add the literal as reusable field,
@@ -557,7 +545,7 @@ object GenerateUtils {
       (resultTypeTerm, "result"),
       ("boolean", "isNull"))
 
-    val wrappedCode = if (ctx.nullCheck) {
+    val wrappedCode =
       s"""
          |$nullTerm = $inputTerm == null;
          |$resultTerm = $defaultValue;
@@ -565,11 +553,6 @@ object GenerateUtils {
          |  $resultTerm = $inputUnboxingTerm;
          |}
          |""".stripMargin.trim
-    } else {
-      s"""
-         |$resultTerm = $inputUnboxingTerm;
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, wrappedCode, inputType)
   }
@@ -613,7 +596,7 @@ object GenerateUtils {
           (resultTypeTerm, "field"),
           ("boolean", "isNull"))
 
-        val inputCode = if (ctx.nullCheck) {
+        val inputCode =
           s"""
              |$nullTerm = $inputTerm.isNullAt($index);
              |$fieldTerm = $defaultValue;
@@ -621,12 +604,7 @@ object GenerateUtils {
              |  $fieldTerm = $readCode;
              |}
            """.stripMargin.trim
-        } else {
-          s"""
-             |$nullTerm = false;
-             |$fieldTerm = $readCode;
-           """.stripMargin
-        }
+
         GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType)
 
       case DISTINCT_TYPE =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
index 8c14843..71a5493 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
@@ -372,8 +372,7 @@ class MatchCodeGenerator(
         // attribute is proctime indicator.
         // We use a null literal and generate a timestamp when we need it.
         generateNullLiteral(
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
-          ctx.nullCheck)
+          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3))
 
       case MATCH_ROWTIME =>
         generateRowtimeAccess(
@@ -640,7 +639,7 @@ class MatchCodeGenerator(
       ctx.addReusablePerRecordStatement(codeForAgg)
 
       val defaultValue = primitiveDefaultValue(singleAggResultType)
-      val codeForSingleAgg = if (ctx.nullCheck) {
+      val codeForSingleAgg =
         j"""
            |boolean $singleAggNullTerm;
            |$primitiveSingleAggResultTypeTerm $singleAggResultTerm;
@@ -653,12 +652,6 @@ class MatchCodeGenerator(
            |  $singleAggResultTerm = $defaultValue;
            |}
            |""".stripMargin
-      } else {
-        j"""
-           |$primitiveSingleAggResultTypeTerm $singleAggResultTerm =
-           |    ($boxedSingleAggResultTypeTerm) $allAggRowTerm.getField(${aggregates.size});
-           |""".stripMargin
-      }
 
       ctx.addReusablePerRecordStatement(codeForSingleAgg)
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 60059e3..50bb157 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -492,10 +492,7 @@ object HashAggCodeGenHelper {
              |""".stripMargin.trim
 
         if (filterArg >= 0) {
-          var filterTerm = s"$inputTerm.getBoolean($filterArg)"
-          if (ctx.nullCheck) {
-            filterTerm = s"!$inputTerm.isNullAt($filterArg) && " + filterTerm
-          }
+          val filterTerm = s"!$inputTerm.isNullAt($filterArg) && $inputTerm.getBoolean($filterArg)"
           s"""
              |if ($filterTerm) {
              | $innerCode
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6288b96..020708d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.flink.util.Preconditions.checkArgument
 
 import java.time.ZoneId
-import java.util.Arrays.asList
+
 import scala.collection.JavaConversions._
 
 /**
@@ -449,8 +449,8 @@ object ScalarOperatorGens {
       left: GeneratedExpression,
       right: GeneratedExpression)
     : GeneratedExpression = {
-    generateOr(ctx,
-      generateAnd(ctx, generateIsNull(ctx, left), generateIsNull(ctx, right)),
+    generateOr(
+      generateAnd(generateIsNull(left), generateIsNull(right)),
       generateEquals(ctx, left, right))
   }
 
@@ -579,13 +579,11 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateIsNull(
-      ctx: CodeGeneratorContext,
-      operand: GeneratedExpression): GeneratedExpression = {
-    if (ctx.nullCheck) {
+  def generateIsNull(operand: GeneratedExpression): GeneratedExpression = {
+    if (operand.resultType.isNullable) {
       GeneratedExpression(operand.nullTerm, NEVER_NULL, operand.code, new BooleanType(false))
     }
-    else if (!ctx.nullCheck && isReference(operand.resultType)) {
+    else if (isReference(operand.resultType)) {
       val resultTerm = newName("isNull")
       val operatorCode =
         s"""
@@ -599,10 +597,8 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateIsNotNull(
-      ctx: CodeGeneratorContext,
-      operand: GeneratedExpression): GeneratedExpression = {
-    if (ctx.nullCheck) {
+  def generateIsNotNull(operand: GeneratedExpression): GeneratedExpression = {
+    if (operand.resultType.isNullable) {
       val resultTerm = newName("result")
       val operatorCode =
         s"""
@@ -611,7 +607,7 @@ object ScalarOperatorGens {
            |""".stripMargin.trim
       GeneratedExpression(resultTerm, NEVER_NULL, operatorCode, new BooleanType(false))
     }
-    else if (!ctx.nullCheck && isReference(operand.resultType)) {
+    else if (isReference(operand.resultType)) {
       val resultTerm = newName("result")
       val operatorCode =
         s"""
@@ -625,13 +621,10 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateAnd(
-      ctx: CodeGeneratorContext,
-      left: GeneratedExpression,
-      right: GeneratedExpression): GeneratedExpression = {
+  def generateAnd(left: GeneratedExpression, right: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-    val operatorCode = if (ctx.nullCheck) {
+    val operatorCode =
       // Three-valued logic:
       // no Unknown -> Two-valued logic
       // True && Unknown -> Unknown
@@ -675,28 +668,14 @@ object ScalarOperatorGens {
          |  }
          |}
        """.stripMargin.trim
-    }
-    else {
-      s"""
-         |${left.code}
-         |boolean $resultTerm = false;
-         |if (${left.resultTerm}) {
-         |  ${right.code}
-         |  $resultTerm = ${right.resultTerm};
-         |}
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
   }
 
-  def generateOr(
-      ctx: CodeGeneratorContext,
-      left: GeneratedExpression,
-      right: GeneratedExpression): GeneratedExpression = {
+  def generateOr(left: GeneratedExpression, right: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-    val operatorCode = if (ctx.nullCheck) {
+    val operatorCode =
       // Three-valued logic:
       // no Unknown -> Two-valued logic
       // True || Unknown -> True
@@ -707,14 +686,14 @@ object ScalarOperatorGens {
       s"""
          |${left.code}
          |
-        |boolean $resultTerm = true;
+         |boolean $resultTerm = true;
          |boolean $nullTerm = false;
          |if (!${left.nullTerm} && ${left.resultTerm}) {
          |  // left expr is true, skip right expr
          |} else {
          |  ${right.code}
          |
-        |  if (!${left.nullTerm} && !${right.nullTerm}) {
+         |  if (!${left.nullTerm} && !${right.nullTerm}) {
          |    $resultTerm = ${left.resultTerm} || ${right.resultTerm};
          |    $nullTerm = false;
          |  }
@@ -740,17 +719,6 @@ object ScalarOperatorGens {
          |  }
          |}
          |""".stripMargin.trim
-    }
-    else {
-      s"""
-         |${left.code}
-         |boolean $resultTerm = true;
-         |if (!${left.resultTerm}) {
-         |  ${right.code}
-         |  $resultTerm = ${right.resultTerm};
-         |}
-         |""".stripMargin.trim
-    }
 
     GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
   }
@@ -869,12 +837,7 @@ object ScalarOperatorGens {
     val rule = CastRuleProvider.resolve(operand.resultType, targetType)
     rule match {
       case codeGeneratorCastRule: CodeGeneratorCastRule[_, _] =>
-        // Make sure to force nullability checks in case ctx.nullCheck is enabled
-        val inputType = if (ctx.nullCheck) {
-          operand.resultType.copy(true)
-        } else {
-          operand.resultType
-        }
+        val inputType = operand.resultType.copy(true)
 
         // Generate the code block
         val castCodeBlock = codeGeneratorCastRule.generateCodeBlock(
@@ -969,7 +932,7 @@ object ScalarOperatorGens {
       val resultTypeTerm = primitiveTypeTermForType(resultType)
       val defaultValue = primitiveDefaultValue(resultType)
 
-      val operatorCode = if (ctx.nullCheck) {
+      val operatorCode =
         s"""
            |${condition.code}
            |$resultTypeTerm $resultTerm = $defaultValue;
@@ -989,21 +952,6 @@ object ScalarOperatorGens {
            |  }
            |}
            |""".stripMargin.trim
-      }
-      else {
-        s"""
-           |${condition.code}
-           |$resultTypeTerm $resultTerm;
-           |if (${condition.resultTerm}) {
-           |  ${trueAction.code}
-           |  $resultTerm = ${trueAction.resultTerm};
-           |}
-           |else {
-           |  ${falseAction.code}
-           |  $resultTerm = ${falseAction.resultTerm};
-           |}
-           |""".stripMargin.trim
-      }
 
       GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
     }
@@ -1041,7 +989,7 @@ object ScalarOperatorGens {
     val resultTypeTerm = primitiveTypeTermForType(access.resultType)
     val defaultValue = primitiveDefaultValue(access.resultType)
 
-    val resultCode = if (ctx.nullCheck) {
+    val resultCode =
       s"""
          |${operands.map(_.code).mkString("\n")}
          |$resultTypeTerm $resultTerm;
@@ -1056,14 +1004,6 @@ object ScalarOperatorGens {
          |  $nullTerm = ${access.nullTerm};
          |}
          |""".stripMargin
-    } else {
-      s"""
-         |${operands.map(_.code).mkString("\n")}
-         |${access.code}
-         |$resultTypeTerm $resultTerm = ${access.resultTerm};
-         |""".stripMargin
-    }
-
 
     GeneratedExpression(
       resultTerm,
@@ -1105,7 +1045,7 @@ object ScalarOperatorGens {
           val tpe = fieldTypes(idx)
           if (element.literal) {
             ""
-          } else if(ctx.nullCheck) {
+          } else if (tpe.isNullable) {
             s"""
                |${element.code}
                |if (${element.nullTerm}) {
@@ -1152,7 +1092,7 @@ object ScalarOperatorGens {
     val writeCode = elements.zipWithIndex.map {
       case (element, idx) =>
         val tpe = fieldTypes(idx)
-        if (ctx.nullCheck) {
+        if (tpe.isNullable) {
           s"""
              |${element.code}
              |if (${element.nullTerm}) {
@@ -1207,7 +1147,7 @@ object ScalarOperatorGens {
           }
         }
         val array = generateLiteralArray(ctx, arrayType, mapped)
-        val code = generatePrimitiveArrayUpdateCode(ctx, array.resultTerm, elementType, elements)
+        val code = generatePrimitiveArrayUpdateCode(array.resultTerm, elementType, elements)
         GeneratedExpression(array.resultTerm, GeneratedExpression.NEVER_NULL, code, arrayType)
       } else {
         // generate general array
@@ -1217,14 +1157,13 @@ object ScalarOperatorGens {
   }
 
   private def generatePrimitiveArrayUpdateCode(
-      ctx: CodeGeneratorContext,
       arrayTerm: String,
       elementType: LogicalType,
       elements: Seq[GeneratedExpression]): String = {
     elements.zipWithIndex.map { case (element, idx) =>
       if (element.literal) {
         ""
-      } else if (ctx.nullCheck) {
+      } else if (elementType.isNullable) {
         s"""
            |${element.code}
            |if (${element.nullTerm}) {
@@ -1299,7 +1238,6 @@ object ScalarOperatorGens {
    * @see [[org.apache.calcite.sql.fun.SqlStdOperatorTable.ITEM]]
    */
   def generateArrayElementAt(
-      ctx: CodeGeneratorContext,
       array: GeneratedExpression,
       index: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
@@ -1329,9 +1267,7 @@ object ScalarOperatorGens {
     GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, componentInfo)
   }
 
-  def generateArrayElement(
-      ctx: CodeGeneratorContext,
-      array: GeneratedExpression): GeneratedExpression = {
+  def generateArrayElement(array: GeneratedExpression): GeneratedExpression = {
     val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
     val resultType = array.resultType.asInstanceOf[ArrayType].getElementType
     val resultTypeTerm = primitiveTypeTermForType(resultType)
@@ -1457,10 +1393,8 @@ object ScalarOperatorGens {
       // there are some non-literal primitive fields need to update
       val keyArrayTerm = newName("keyArray")
       val valueArrayTerm = newName("valueArray")
-      val keyUpdate = generatePrimitiveArrayUpdateCode(
-        ctx, keyArrayTerm, keyType, keyElements)
-      val valueUpdate = generatePrimitiveArrayUpdateCode(
-        ctx, valueArrayTerm, valueType, valueElements)
+      val keyUpdate = generatePrimitiveArrayUpdateCode(keyArrayTerm, keyType, keyElements)
+      val valueUpdate = generatePrimitiveArrayUpdateCode(valueArrayTerm, valueType, valueElements)
       s"""
          |$BINARY_ARRAY $keyArrayTerm = $binaryMap.keyArray();
          |$keyUpdate
@@ -1597,7 +1531,7 @@ object ScalarOperatorGens {
      resultType: LogicalType): GeneratedExpression = {
     checkArgument(literalExpr.literal)
     if (java.lang.Boolean.valueOf(literalExpr.nullTerm)) {
-      return generateNullLiteral(resultType, nullCheck = true)
+      return generateNullLiteral(resultType)
     }
 
     val castExecutor = CastRuleProvider.create(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
index 42c68d2..8b388b7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala
@@ -82,14 +82,14 @@ object SearchOperatorGen {
         .map(CastRuleProvider.cast(toCastContext(ctx), sargType, commonType, _))
         .map(generateLiteral(ctx, _, commonType))
       if (sarg.containsNull) {
-        haystack += generateNullLiteral(commonType, ctx.nullCheck)
+        haystack += generateNullLiteral(commonType)
       }
       val setTerm = ctx.addReusableHashSet(haystack.toSeq, commonType)
       val negation = if (sarg.isComplementedPoints) "!" else ""
 
       val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
 
-      val operatorCode = if (ctx.nullCheck) {
+      val operatorCode =
         s"""
            |${needle.code}
            |// --- Begin SEARCH ${target.resultTerm}
@@ -101,15 +101,6 @@ object SearchOperatorGen {
            |}
            |// --- End SEARCH ${target.resultTerm}
            |""".stripMargin.trim
-      }
-      else {
-        s"""
-           |${needle.code}
-           |// --- Begin SEARCH ${target.resultTerm}
-           |boolean $resultTerm = $negation$setTerm.contains(${needle.resultTerm});
-           |// --- End SEARCH ${target.resultTerm}
-           |""".stripMargin.trim
-      }
 
       GeneratedExpression(resultTerm, nullTerm, operatorCode, new BooleanType())
     } else {
@@ -127,11 +118,11 @@ object SearchOperatorGen {
         .map(RangeSets.map(_, rangeToExpression))
 
       if (sarg.containsNull) {
-        rangeChecks = Seq(generateIsNull(ctx, target)) ++ rangeChecks
+        rangeChecks = Seq(generateIsNull(target)) ++ rangeChecks
       }
 
       val generatedRangeChecks = rangeChecks
-        .reduce((left, right) => generateOr(ctx, left, right))
+        .reduce((left, right) => generateOr(left, right))
 
       // Add the target expression code
       val finalCode =
@@ -194,7 +185,6 @@ object SearchOperatorGen {
      */
     override def closed(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<=", lit(lower), target),
         generateComparison(ctx, "<=", target, lit(upper))
       )
@@ -205,7 +195,6 @@ object SearchOperatorGen {
      */
     override def closedOpen(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<=", lit(lower), target),
         generateComparison(ctx, "<", target, lit(upper))
       )
@@ -216,7 +205,6 @@ object SearchOperatorGen {
      */
     override def openClosed(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<", lit(lower), target),
         generateComparison(ctx, "<=", target, lit(upper))
       )
@@ -227,7 +215,6 @@ object SearchOperatorGen {
      */
     override def open(lower: C, upper: C): GeneratedExpression = {
       generateAnd(
-        ctx,
         generateComparison(ctx, "<", lit(lower), target),
         generateComparison(ctx, "<", target, lit(upper))
       )
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
index a7b7add..70b753d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/JoinITCase.scala
@@ -326,8 +326,6 @@ class JoinITCase extends BatchTestBase {
 
   @Test
   def testFullJoinWithNonEquiJoinPred(): Unit = {
-    tEnv.getConfig.setNullCheck(true)
-
     val ds1 = CollectionBatchExecTable.get3TupleDataSet(tEnv, "a, b, c")
     val ds2 = CollectionBatchExecTable.get5TupleDataSet(tEnv, "d, e, f, g, h")
 

[flink] 04/04: [FLINK-26689][table] Replace `TableConfig` with `ReadableConfig`

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fba24aa7d27f1a991c8d98c4557f68a87d90f6b
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Wed Mar 16 13:07:04 2022 +0200

    [FLINK-26689][table] Replace `TableConfig` with `ReadableConfig`
    
    Replace concrete `TableConfig` with `ReadableConfig` wherever possible
    to have a transparent reading of configuration options wihtout the need
    of calling specific methods on the `TableConfig` object.
    
    Add `getLocalTimeZone()` to `TableConfigUtils` to be able to retrieve
    the timezone from a `ReadableConfig` without the need of passing around
    `TableConfig` as method args.
    
    This closes #19045.
---
 ...ghPythonStreamGroupWindowAggregateOperator.java |  3 +-
 ...owPythonGroupAggregateFunctionOperatorTest.java |  7 ++---
 ...onGroupWindowAggregateFunctionOperatorTest.java |  7 ++---
 ...honOverWindowAggregateFunctionOperatorTest.java |  7 ++---
 ...onGroupWindowAggregateFunctionOperatorTest.java |  3 +-
 ...rrowPythonProcTimeBoundedRangeOperatorTest.java |  3 +-
 ...ArrowPythonProcTimeBoundedRowsOperatorTest.java |  3 +-
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |  3 +-
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |  3 +-
 .../scalar/PythonScalarFunctionOperatorTest.java   |  5 ++--
 .../ArrowPythonScalarFunctionOperatorTest.java     |  5 ++--
 .../table/PythonTableFunctionOperatorTest.java     |  3 +-
 .../abilities/source/WatermarkPushDownSpec.java    |  2 +-
 .../exec/batch/BatchExecBoundedStreamScan.java     |  2 +-
 .../plan/nodes/exec/batch/BatchExecExchange.java   |  5 +---
 .../nodes/exec/batch/BatchExecHashAggregate.java   |  2 +-
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   | 11 +++-----
 .../exec/batch/BatchExecHashWindowAggregate.java   |  2 +-
 .../exec/batch/BatchExecLegacyTableSourceScan.java |  2 +-
 .../nodes/exec/batch/BatchExecNestedLoopJoin.java  |  2 +-
 .../nodes/exec/batch/BatchExecOverAggregate.java   | 12 ++++----
 .../exec/batch/BatchExecPythonGroupAggregate.java  |  6 ++--
 .../batch/BatchExecPythonGroupWindowAggregate.java |  6 ++--
 .../exec/batch/BatchExecPythonOverAggregate.java   |  6 ++--
 .../plan/nodes/exec/batch/BatchExecRank.java       |  4 +--
 .../plan/nodes/exec/batch/BatchExecSort.java       |  3 +-
 .../nodes/exec/batch/BatchExecSortAggregate.java   |  2 +-
 .../plan/nodes/exec/batch/BatchExecSortLimit.java  |  3 +-
 .../nodes/exec/batch/BatchExecSortMergeJoin.java   |  9 +++---
 .../exec/batch/BatchExecSortWindowAggregate.java   |  2 +-
 .../plan/nodes/exec/common/CommonExecCalc.java     |  3 +-
 .../nodes/exec/common/CommonExecCorrelate.java     |  5 ++--
 .../plan/nodes/exec/common/CommonExecExpand.java   |  2 +-
 .../nodes/exec/common/CommonExecLegacySink.java    |  2 +-
 .../nodes/exec/common/CommonExecLookupJoin.java    | 12 ++++----
 .../nodes/exec/common/CommonExecPythonCalc.java    |  6 ++--
 .../exec/common/CommonExecPythonCorrelate.java     |  2 +-
 .../plan/nodes/exec/common/CommonExecValues.java   |  5 +---
 .../exec/stream/StreamExecDataStreamScan.java      |  2 +-
 .../stream/StreamExecGlobalGroupAggregate.java     |  2 +-
 .../stream/StreamExecGlobalWindowAggregate.java    |  2 +-
 .../exec/stream/StreamExecGroupAggregate.java      |  2 +-
 .../exec/stream/StreamExecGroupTableAggregate.java |  2 +-
 .../stream/StreamExecGroupWindowAggregate.java     |  2 +-
 .../StreamExecIncrementalGroupAggregate.java       |  2 +-
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |  2 +-
 .../plan/nodes/exec/stream/StreamExecJoin.java     |  3 +-
 .../stream/StreamExecLegacyTableSourceScan.java    |  2 +-
 .../exec/stream/StreamExecLocalGroupAggregate.java |  2 +-
 .../stream/StreamExecLocalWindowAggregate.java     |  2 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    | 20 ++++++-------
 .../nodes/exec/stream/StreamExecOverAggregate.java |  2 +-
 .../StreamExecPythonGroupWindowAggregate.java      |  2 +-
 .../exec/stream/StreamExecPythonOverAggregate.java |  2 +-
 .../plan/nodes/exec/stream/StreamExecRank.java     |  2 +-
 .../plan/nodes/exec/stream/StreamExecSort.java     |  2 +-
 .../nodes/exec/stream/StreamExecTemporalJoin.java  |  2 +-
 .../nodes/exec/stream/StreamExecTemporalSort.java  | 10 ++-----
 .../exec/stream/StreamExecWatermarkAssigner.java   |  2 +-
 .../exec/stream/StreamExecWindowAggregate.java     |  2 +-
 .../nodes/exec/stream/StreamExecWindowJoin.java    |  3 +-
 .../nodes/exec/stream/StreamExecWindowRank.java    |  2 +-
 .../table/planner/plan/utils/KeySelectorUtil.java  |  4 +--
 .../table/planner/utils/TableConfigUtils.java      | 33 ++++++++++++++++++++++
 .../table/planner/codegen/CalcCodeGenerator.scala  |  5 ++--
 .../planner/codegen/CodeGeneratorContext.scala     | 18 ++++++------
 .../planner/codegen/CollectorCodeGenerator.scala   |  4 +--
 .../planner/codegen/CorrelateCodeGenerator.scala   |  9 +++---
 .../planner/codegen/EqualiserCodeGenerator.scala   |  6 ++--
 .../table/planner/codegen/ExprCodeGenerator.scala  |  4 +--
 .../table/planner/codegen/ExpressionReducer.scala  |  7 +++--
 .../planner/codegen/FunctionCodeGenerator.scala    |  4 +--
 .../table/planner/codegen/HashCodeGenerator.scala  |  2 +-
 .../planner/codegen/InputFormatCodeGenerator.scala |  2 +-
 .../planner/codegen/LongHashJoinGenerator.scala    |  9 +++---
 .../planner/codegen/LookupJoinCodeGenerator.scala  | 20 ++++++-------
 .../table/planner/codegen/MatchCodeGenerator.scala |  6 ++--
 .../planner/codegen/OperatorCodeGenerator.scala    |  4 +--
 .../planner/codegen/ProjectionCodeGenerator.scala  |  2 +-
 .../planner/codegen/ValuesCodeGenerator.scala      |  4 +--
 .../codegen/WatermarkGeneratorCodeGenerator.scala  | 12 ++++----
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  8 +++---
 .../codegen/calls/BridgingSqlFunctionCallGen.scala |  2 +-
 .../planner/codegen/calls/FunctionGenerator.scala  | 13 +++++----
 .../planner/codegen/calls/ScalarOperatorGens.scala |  8 +++---
 ...ltiFieldRangeBoundComparatorCodeGenerator.scala |  6 ++--
 .../over/RangeBoundComparatorCodeGenerator.scala   |  6 ++--
 .../codegen/sort/ComparatorCodeGenerator.scala     |  6 ++--
 .../planner/codegen/sort/SortCodeGenerator.scala   |  6 ++--
 .../physical/batch/BatchPhysicalJoinBase.scala     |  2 +-
 ...ushPartitionIntoLegacyTableSourceScanRule.scala |  8 +++---
 .../flink/table/planner/plan/utils/JoinUtil.scala  | 13 +++++----
 .../table/planner/plan/utils/PartitionPruner.scala |  4 +--
 .../flink/table/planner/codegen/CodeSplitTest.java | 17 +++++------
 .../planner/codegen/LongHashJoinGeneratorTest.java |  4 +--
 .../planner/codegen/SortCodeGeneratorTest.java     |  5 ++--
 .../planner/codegen/HashCodeGeneratorTest.scala    |  6 ++--
 .../codegen/ProjectionCodeGeneratorTest.scala      | 12 ++++----
 .../codegen/WatermarkGeneratorCodeGenTest.scala    | 24 +++++++---------
 .../table/planner/codegen/agg/AggTestBase.scala    |  4 +--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |  6 ++--
 .../planner/plan/utils/PartitionPrunerTest.scala   |  8 +++---
 .../generated/GeneratedAggsHandleFunction.java     |  3 +-
 .../runtime/generated/GeneratedCollector.java      |  3 +-
 .../table/runtime/generated/GeneratedFunction.java |  3 +-
 .../runtime/generated/GeneratedHashFunction.java   |  4 +--
 .../table/runtime/generated/GeneratedInput.java    |  4 +--
 .../runtime/generated/GeneratedJoinCondition.java  |  3 +-
 .../GeneratedNamespaceAggsHandleFunction.java      |  3 +-
 .../GeneratedNamespaceTableAggsHandleFunction.java |  3 +-
 .../table/runtime/generated/GeneratedOperator.java |  4 +--
 .../runtime/generated/GeneratedProjection.java     |  3 +-
 .../generated/GeneratedRecordComparator.java       |  3 +-
 .../generated/GeneratedRecordEqualiser.java        |  3 +-
 .../runtime/generated/GeneratedResultFuture.java   |  3 +-
 .../GeneratedTableAggsHandleFunction.java          |  4 +--
 .../generated/GeneratedWatermarkGenerator.java     |  3 +-
 118 files changed, 316 insertions(+), 304 deletions(-)

diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
index a5bf90a..a26574d 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
@@ -28,7 +28,6 @@ import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
 import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -376,7 +375,7 @@ public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
                                 .collect(Collectors.toList()));
         final GeneratedProjection generatedProjection =
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         name,
                         inputType,
                         forwardedFieldType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
index e758119..ea80b4c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -187,19 +186,19 @@ public class BatchArrowPythonGroupAggregateFunctionOperatorTest
                 udfInputType,
                 udfOutputType,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
                         udafInputOffsets),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupKey",
                         inputType,
                         (RowType) Projection.of(groupingSet).project(inputType),
                         groupingSet),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupSet",
                         inputType,
                         (RowType) Projection.of(groupingSet).project(inputType),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index a6ec6db..e762d4e 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
@@ -346,19 +345,19 @@ public class BatchArrowPythonGroupWindowAggregateFunctionOperatorTest
                 5000L,
                 new int[] {0, 1},
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputRowType,
                         udfInputType,
                         udafInputOffsets),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupKey",
                         inputRowType,
                         (RowType) Projection.of(groupingSet).project(inputRowType),
                         groupingSet),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupSet",
                         inputRowType,
                         (RowType) Projection.of(groupingSet).project(inputRowType),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
index 1fee476..991fcdd 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -245,19 +244,19 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperatorTest
                 3,
                 true,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputRowType,
                         udfInputType,
                         udafInputOffsets),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupKey",
                         inputRowType,
                         (RowType) Projection.of(groupingSet).project(inputRowType),
                         groupingSet),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "GroupSet",
                         inputRowType,
                         (RowType) Projection.of(groupingSet).project(inputRowType),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index e762222..d55c615 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
@@ -475,7 +474,7 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                 },
                 UTC_ZONE_ID,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
index a95157c..f774cdf 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -129,7 +128,7 @@ public class StreamArrowPythonProcTimeBoundedRangeOperatorTest
                 -1,
                 100L,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
index b65a0fb..cb48b13 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -128,7 +127,7 @@ public class StreamArrowPythonProcTimeBoundedRowsOperatorTest
                 3,
                 1,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 849c6b0..3b6017a 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -285,7 +284,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
                 3,
                 3L,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index 1ee0e7f..f45a17a 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -253,7 +252,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
                 3,
                 1,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdafInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
index 17ae33c..cce6a70 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
@@ -80,13 +79,13 @@ public class PythonScalarFunctionOperatorTest
                 udfInputType,
                 udfOutputType,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdfInputProjection",
                         inputType,
                         udfInputType,
                         udfInputOffsets),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "ForwardedFieldProjection",
                         inputType,
                         forwardedFieldType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
index 953afb0..2b41b2b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
@@ -81,13 +80,13 @@ public class ArrowPythonScalarFunctionOperatorTest
                 udfInputType,
                 udfOutputType,
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdfInputProjection",
                         inputType,
                         udfInputType,
                         udfInputOffsets),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "ForwardedFieldProjection",
                         inputType,
                         forwardedFieldType,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 840503c..2997225 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.runtime.operators.python.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -96,7 +95,7 @@ public class PythonTableFunctionOperatorTest
                 udfOutputType,
                 JoinTypeUtil.getFlinkJoinType(joinRelType),
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(new TableConfig()),
+                        CodeGeneratorContext.apply(new Configuration()),
                         "UdtfInputProjection",
                         inputType,
                         udfInputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
index 59ff267..797567d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
@@ -75,7 +75,7 @@ public final class WatermarkPushDownSpec extends SourceAbilitySpecBase {
         if (tableSource instanceof SupportsWatermarkPushDown) {
             GeneratedWatermarkGenerator generatedWatermarkGenerator =
                     WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
-                            context.getTableConfig(),
+                            context.getTableConfig().getConfiguration(),
                             context.getSourceRowType(),
                             watermarkExpr,
                             Option.apply("context"));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecBoundedStreamScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecBoundedStreamScan.java
index 167870e..7696bf6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecBoundedStreamScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecBoundedStreamScan.java
@@ -73,7 +73,7 @@ public class BatchExecBoundedStreamScan extends ExecNodeBase<RowData>
         final Transformation<?> sourceTransform = dataStream.getTransformation();
         if (needInternalConversion()) {
             return ScanUtil.convertToInternalRow(
-                    new CodeGeneratorContext(config.getTableConfig()),
+                    new CodeGeneratorContext(config),
                     (Transformation<Object>) sourceTransform,
                     fieldIndexes,
                     sourceType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
index c07bd85..1f7f022 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
@@ -203,10 +203,7 @@ public class BatchExecExchange extends CommonExecExchange implements BatchExecNo
                         .toArray(String[]::new);
         return new BinaryHashPartitioner(
                 HashCodeGenerator.generateRowHash(
-                        new CodeGeneratorContext(config.getTableConfig()),
-                        inputType,
-                        "HashPartitioner",
-                        keys),
+                        new CodeGeneratorContext(config), inputType, "HashPartitioner", keys),
                 fieldNames);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
index 98511a2..4f9d01b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
@@ -93,7 +93,7 @@ public class BatchExecHashAggregate extends ExecNodeBase<RowData>
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = (RowType) getOutputType();
 
-        final CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
+        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
 
         final AggregateInfoList aggInfos =
                 AggregateUtil.transformToBatchAggregateInfoList(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
index e003eb2..13ac38a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
@@ -112,22 +112,19 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
 
         GeneratedJoinCondition condFunc =
                 JoinUtil.generateConditionFunction(
-                        config.getTableConfig(),
-                        joinSpec.getNonEquiCondition().orElse(null),
-                        leftType,
-                        rightType);
+                        config, joinSpec.getNonEquiCondition().orElse(null), leftType, rightType);
 
         // projection for equals
         GeneratedProjection leftProj =
                 ProjectionCodeGenerator.generateProjection(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         "HashJoinLeftProjection",
                         leftType,
                         keyType,
                         leftKeys);
         GeneratedProjection rightProj =
                 ProjectionCodeGenerator.generateProjection(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         "HashJoinRightProjection",
                         rightType,
                         keyType,
@@ -186,7 +183,7 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
         if (LongHashJoinGenerator.support(hashJoinType, keyType, joinSpec.getFilterNulls())) {
             operator =
                     LongHashJoinGenerator.gen(
-                            config.getTableConfig(),
+                            config,
                             hashJoinType,
                             keyType,
                             buildType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
index c16f386..10bd976 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
@@ -117,7 +117,7 @@ public class BatchExecHashWindowAggregate extends ExecNodeBase<RowData>
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final HashWindowCodeGenerator hashWindowCodeGenerator =
                 new HashWindowCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         planner.getRelBuilder(),
                         window,
                         inputTimeFieldIndex,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacyTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacyTableSourceScan.java
index 69a728b..3f15872 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacyTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacyTableSourceScan.java
@@ -95,7 +95,7 @@ public class BatchExecLegacyTableSourceScan extends CommonExecLegacyTableSourceS
                     TableSourceUtil.fixPrecisionForProducedDataType(
                             tableSource, (RowType) getOutputType());
             return ScanUtil.convertToInternalRow(
-                    new CodeGeneratorContext(config.getTableConfig()),
+                    new CodeGeneratorContext(config),
                     (Transformation<Object>) sourceTransform,
                     fieldIndexes,
                     fixedProducedDataType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
index 0d868ff..dce2f3c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
@@ -89,7 +89,7 @@ public class BatchExecNestedLoopJoin extends ExecNodeBase<RowData>
 
         CodeGenOperatorFactory<RowData> operator =
                 new NestedLoopJoinCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 singleRowJoin,
                                 leftIsBuild,
                                 leftType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
index 6e4805c..f26099f2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
@@ -103,7 +103,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
         final int[] partitionFields = overSpec.getPartition().getFieldIndices();
         final GeneratedRecordComparator genComparator =
                 ComparatorCodeGenerator.gen(
-                        config.getTableConfig(),
+                        config,
                         "SortComparator",
                         inputType,
                         SortUtil.getAscendingSortSpec(partitionFields));
@@ -136,7 +136,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
                                 sortSpec.getFieldIndices());
                 AggsHandlerCodeGenerator generator =
                         new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 planner.getRelBuilder(),
                                 JavaScalaConversionUtil.toScala(inputType.getChildren()),
                                 false); // copyInputField
@@ -207,7 +207,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
 
                     AggsHandlerCodeGenerator generator =
                             new AggsHandlerCodeGenerator(
-                                    new CodeGeneratorContext(config.getTableConfig()),
+                                    new CodeGeneratorContext(config),
                                     relBuilder,
                                     JavaScalaConversionUtil.toScala(inputType.getChildren()),
                                     false); // copyInputField
@@ -273,7 +273,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
                                 sortSpec.getFieldIndices());
                 AggsHandlerCodeGenerator generator =
                         new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(inputType.getChildren()),
                                 false); // copyInputField
@@ -397,7 +397,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
             int sortKey = sortSpec.getFieldIndices()[0];
             return new RangeBoundComparatorCodeGenerator(
                             relBuilder,
-                            config.getTableConfig(),
+                            config,
                             inputType,
                             bound,
                             sortKey,
@@ -408,7 +408,7 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
         } else {
             // if the bound is current row, then window support comparing based on multi fields.
             return new MultiFieldRangeBoundComparatorCodeGenerator(
-                            config.getTableConfig(), inputType, sortSpec, isLowerBound)
+                            config, inputType, sortSpec, isLowerBound)
                     .generateBoundComparator("MultiFieldRangeBoundComparator");
         }
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
index 46ada1b..ca95b34 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
@@ -164,19 +164,19 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase<RowData>
                             udfInputType,
                             udfOutputType,
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "UdafInputProjection",
                                     inputRowType,
                                     udfInputType,
                                     udafInputOffsets),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupKey",
                                     inputRowType,
                                     (RowType) Projection.of(grouping).project(inputRowType),
                                     grouping),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupSet",
                                     inputRowType,
                                     (RowType) Projection.of(auxGrouping).project(inputRowType),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
index 1833970..d1fbbcc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
@@ -236,19 +236,19 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase<RowData>
                             slideSize,
                             namePropertyTypeArray,
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "UdafInputProjection",
                                     inputRowType,
                                     udfInputType,
                                     udafInputOffsets),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupKey",
                                     inputRowType,
                                     (RowType) Projection.of(grouping).project(inputRowType),
                                     grouping),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupSet",
                                     inputRowType,
                                     (RowType) Projection.of(auxGrouping).project(inputRowType),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
index c9529ad..0d65b07 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
@@ -250,13 +250,13 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
                             sortSpec.getFieldIndices()[0],
                             sortSpec.getAscendingOrders()[0],
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "UdafInputProjection",
                                     inputRowType,
                                     udfInputType,
                                     udafInputOffsets),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupKey",
                                     inputRowType,
                                     (RowType)
@@ -264,7 +264,7 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
                                                     .project(inputRowType),
                                     partitionSpec.getFieldIndices()),
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "GroupSet",
                                     inputRowType,
                                     (RowType)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
index 0a86211..26f2a70 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
@@ -88,12 +88,12 @@ public class BatchExecRank extends ExecNodeBase<RowData> implements InputSortedE
         RankOperator operator =
                 new RankOperator(
                         ComparatorCodeGenerator.gen(
-                                config.getTableConfig(),
+                                config,
                                 "PartitionByComparator",
                                 inputType,
                                 SortUtil.getAscendingSortSpec(partitionFields)),
                         ComparatorCodeGenerator.gen(
-                                config.getTableConfig(),
+                                config,
                                 "OrderByComparator",
                                 inputType,
                                 SortUtil.getAscendingSortSpec(sortFields)),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
index 8f8e420..9bfbc9a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
@@ -69,8 +69,7 @@ public class BatchExecSort extends ExecNodeBase<RowData> implements BatchExecNod
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
         RowType inputType = (RowType) inputEdge.getOutputType();
-        SortCodeGenerator codeGen =
-                new SortCodeGenerator(config.getTableConfig(), inputType, sortSpec);
+        SortCodeGenerator codeGen = new SortCodeGenerator(config, inputType, sortSpec);
 
         SortOperator operator =
                 new SortOperator(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
index e550fce..cffa32b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
@@ -92,7 +92,7 @@ public class BatchExecSortAggregate extends ExecNodeBase<RowData>
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = (RowType) getOutputType();
 
-        final CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
+        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
         final AggregateInfoList aggInfos =
                 AggregateUtil.transformToBatchAggregateInfoList(
                         aggInputRowType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
index 3cbaefb..78bba8b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
@@ -87,8 +87,7 @@ public class BatchExecSortLimit extends ExecNodeBase<RowData>
         RowType inputType = (RowType) inputEdge.getOutputType();
         // generate comparator
         GeneratedRecordComparator genComparator =
-                ComparatorCodeGenerator.gen(
-                        config.getTableConfig(), "SortLimitComparator", inputType, sortSpec);
+                ComparatorCodeGenerator.gen(config, "SortLimitComparator", inputType, sortSpec);
 
         // TODO If input is ordered, there is no need to use the heap.
         SortLimitOperator operator =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
index 9d8e7d7..7c89f5c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
@@ -108,8 +108,7 @@ public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
         RowType keyType = RowType.of(keyFieldTypes);
 
         GeneratedJoinCondition condFunc =
-                JoinUtil.generateConditionFunction(
-                        config.getTableConfig(), nonEquiCondition, leftType, rightType);
+                JoinUtil.generateConditionFunction(config, nonEquiCondition, leftType, rightType);
 
         long externalBufferMemory =
                 config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
@@ -134,13 +133,13 @@ public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
                         leftIsSmaller,
                         condFunc,
                         ProjectionCodeGenerator.generateProjection(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 "SMJProjection",
                                 leftType,
                                 keyType,
                                 leftKeys),
                         ProjectionCodeGenerator.generateProjection(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 "SMJProjection",
                                 rightType,
                                 keyType,
@@ -171,6 +170,6 @@ public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
     private SortCodeGenerator newSortGen(
             ExecNodeConfig config, int[] originalKeys, RowType inputType) {
         SortSpec sortSpec = SortUtil.getAscendingSortSpec(originalKeys);
-        return new SortCodeGenerator(config.getTableConfig(), inputType, sortSpec);
+        return new SortCodeGenerator(config, inputType, sortSpec);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
index d91a451..6b3b02a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
@@ -121,7 +121,7 @@ public class BatchExecSortWindowAggregate extends ExecNodeBase<RowData>
         final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window);
         final SortWindowCodeGenerator windowCodeGenerator =
                 new SortWindowCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         planner.getRelBuilder(),
                         window,
                         inputTimeFieldIndex,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCalc.java
index 24024c6..9a7b9e7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCalc.java
@@ -91,8 +91,7 @@ public abstract class CommonExecCalc extends ExecNodeBase<RowData>
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final CodeGeneratorContext ctx =
-                new CodeGeneratorContext(config.getTableConfig())
-                        .setOperatorBaseClass(operatorBaseClass);
+                new CodeGeneratorContext(config).setOperatorBaseClass(operatorBaseClass);
 
         final CodeGenOperatorFactory<RowData> substituteStreamOperator =
                 CalcCodeGenerator.generateCalcOperator(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
index bab655c..9cb5a03 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
@@ -97,10 +97,9 @@ public abstract class CommonExecCorrelate extends ExecNodeBase<RowData>
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final CodeGeneratorContext ctx =
-                new CodeGeneratorContext(config.getTableConfig())
-                        .setOperatorBaseClass(operatorBaseClass);
+                new CodeGeneratorContext(config).setOperatorBaseClass(operatorBaseClass);
         return CorrelateCodeGenerator.generateCorrelateTransformation(
-                config.getTableConfig(),
+                config,
                 ctx,
                 inputTransform,
                 (RowType) inputEdge.getOutputType(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExpand.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExpand.java
index ddb6040..361c9fc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExpand.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExpand.java
@@ -85,7 +85,7 @@ public abstract class CommonExecExpand extends ExecNodeBase<RowData>
 
         final CodeGenOperatorFactory<RowData> operatorFactory =
                 ExpandCodeGenerator.generateExpandOperator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         (RowType) inputEdge.getOutputType(),
                         (RowType) getOutputType(),
                         projects,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
index ea72482..65dadfa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
@@ -193,7 +193,7 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<T>
 
             final CodeGenOperatorFactory<T> converterOperator =
                     SinkCodeGenerator.generateRowConverterOperator(
-                            new CodeGeneratorContext(config.getTableConfig()),
+                            new CodeGeneratorContext(config),
                             convertedInputRowType,
                             tableSink,
                             physicalOutputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index c50deb9..af5bccc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -328,7 +328,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>>
                 generatedFuncWithType =
                         LookupJoinCodeGenerator.generateAsyncLookupFunction(
-                                config.getTableConfig(),
+                                config,
                                 dataTypeFactory,
                                 inputRowType,
                                 tableSourceRowType,
@@ -345,7 +345,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         // a projection or filter after table source scan
         GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture =
                 LookupJoinCodeGenerator.generateTableAsyncCollector(
-                        config.getTableConfig(),
+                        config,
                         "TableFunctionResultFuture",
                         inputRowType,
                         rightRowType,
@@ -358,7 +358,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             // a projection or filter after table source scan
             GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
                     LookupJoinCodeGenerator.generateCalcMapFunction(
-                            config.getTableConfig(),
+                            config,
                             JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                             filterOnTemporalTable,
                             temporalTableOutputType,
@@ -409,7 +409,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
 
         GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
                 LookupJoinCodeGenerator.generateSyncLookupFunction(
-                        config.getTableConfig(),
+                        config,
                         dataTypeFactory,
                         inputRowType,
                         tableSourceRowType,
@@ -424,7 +424,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                 Optional.ofNullable(temporalTableOutputType)
                         .map(FlinkTypeFactory::toLogicalRowType)
                         .orElse(tableSourceRowType);
-        CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
+        CodeGeneratorContext ctx = new CodeGeneratorContext(config);
         GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
                 LookupJoinCodeGenerator.generateCollector(
                         ctx,
@@ -439,7 +439,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             // a projection or filter after table source scan
             GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
                     LookupJoinCodeGenerator.generateCalcMapFunction(
-                            config.getTableConfig(),
+                            config,
                             JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                             filterOnTemporalTable,
                             temporalTableOutputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index 6d63f3b..30a39fb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -248,13 +248,13 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
                                 udfInputType,
                                 udfOutputType,
                                 ProjectionCodeGenerator.generateProjection(
-                                        CodeGeneratorContext.apply(config.getTableConfig()),
+                                        CodeGeneratorContext.apply(config),
                                         "UdfInputProjection",
                                         inputType,
                                         udfInputType,
                                         udfInputOffsets),
                                 ProjectionCodeGenerator.generateProjection(
-                                        CodeGeneratorContext.apply(config.getTableConfig()),
+                                        CodeGeneratorContext.apply(config),
                                         "ForwardedFieldProjection",
                                         inputType,
                                         forwardedFieldType,
@@ -279,7 +279,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
                                     udfOutputType,
                                     udfInputOffsets,
                                     ProjectionCodeGenerator.generateProjection(
-                                            CodeGeneratorContext.apply(config.getTableConfig()),
+                                            CodeGeneratorContext.apply(config),
                                             "ForwardedFieldProjection",
                                             inputType,
                                             forwardedFieldType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 9af214c..927e361 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -175,7 +175,7 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
                             udfOutputType,
                             joinType,
                             ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(config.getTableConfig()),
+                                    CodeGeneratorContext.apply(config),
                                     "UdtfInputProjection",
                                     inputType,
                                     udfInputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
index 21be27f..e9d5afd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
@@ -62,10 +62,7 @@ public abstract class CommonExecValues extends ExecNodeBase<RowData>
             PlannerBase planner, ExecNodeConfig config) {
         final ValuesInputFormat inputFormat =
                 ValuesCodeGenerator.generatorInputFormat(
-                        config.getTableConfig(),
-                        (RowType) getOutputType(),
-                        tuples,
-                        getClass().getSimpleName());
+                        config, (RowType) getOutputType(), tuples, getClass().getSimpleName());
         final Transformation<RowData> transformation =
                 planner.getExecEnv()
                         .createInput(inputFormat, inputFormat.getProducedType())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDataStreamScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDataStreamScan.java
index 7ea8db3..d6d353a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDataStreamScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDataStreamScan.java
@@ -105,7 +105,7 @@ public class StreamExecDataStreamScan extends ExecNodeBase<RowData>
                 resetElement = "";
             }
             final CodeGeneratorContext ctx =
-                    new CodeGeneratorContext(config.getTableConfig())
+                    new CodeGeneratorContext(config)
                             .setOperatorBaseClass(TableStreamOperator.class);
             transformation =
                     ScanUtil.convertToInternalRow(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 9e68aa5..acb32a2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -291,7 +291,7 @@ public class StreamExecGlobalGroupAggregate extends StreamExecAggregateBase {
         // then multi-put to state, so copyInputField is true.
         AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         relBuilder,
                         JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
                         true);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index d3c2710..00b842d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -256,7 +256,7 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa
             ZoneId shifTimeZone) {
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
                                 true) // copyInputField
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index c6d11e6..ccf0c90 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -166,7 +166,7 @@ public class StreamExecGroupAggregate extends StreamExecAggregateBase {
 
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 planner.getRelBuilder(),
                                 JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
                                 // TODO: heap state backend do not copy key currently,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
index cd75944..95671ce 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupTableAggregate.java
@@ -113,7 +113,7 @@ public class StreamExecGroupTableAggregate extends ExecNodeBase<RowData>
 
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 planner.getRelBuilder(),
                                 JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
                                 // TODO: heap state backend do not copy key currently,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index 0e33097..b69f6ce 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -322,7 +322,7 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
 
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(fieldTypes),
                                 false) // copyInputField
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 9322899..cce6a77 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -248,7 +248,7 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase
 
         AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         relBuilder,
                         JavaScalaConversionUtil.toScala(partialLocalAggInputType.getChildren()),
                         inputFieldCopy);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index f9794b1..62f5b9c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -164,7 +164,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData>
                 } else {
                     GeneratedJoinCondition joinCondition =
                             JoinUtil.generateConditionFunction(
-                                    config.getTableConfig(), joinSpec, leftRowType, rightRowType);
+                                    config, joinSpec, leftRowType, rightRowType);
                     IntervalJoinFunction joinFunction =
                             new IntervalJoinFunction(
                                     joinCondition, returnTypeInfo, joinSpec.getFilterNulls());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index 5fb1624..060f06b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -148,8 +148,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                 JoinUtil.analyzeJoinInput(rightTypeInfo, rightJoinKey, rightUniqueKeys);
 
         GeneratedJoinCondition generatedCondition =
-                JoinUtil.generateConditionFunction(
-                        config.getTableConfig(), joinSpec, leftType, rightType);
+                JoinUtil.generateConditionFunction(config, joinSpec, leftType, rightType);
 
         long minRetentionTime = config.getStateRetentionTime();
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
index c5b2128..78bd2ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
@@ -99,7 +99,7 @@ public class StreamExecLegacyTableSourceScan extends CommonExecLegacyTableSource
             }
 
             final CodeGeneratorContext ctx =
-                    new CodeGeneratorContext(config.getTableConfig())
+                    new CodeGeneratorContext(config)
                             .setOperatorBaseClass(TableStreamOperator.class);
             // the produced type may not carry the correct precision user defined in DDL, because
             // it may be converted from legacy type. Fix precision using logical schema from DDL.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
index 80a257a..cbb2ad7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
@@ -134,7 +134,7 @@ public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase {
 
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         planner.getRelBuilder(),
                         JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
                         // the local aggregate result will be buffered, so need copy
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index a701962..29c82b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -189,7 +189,7 @@ public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBas
             ZoneId shiftTimeZone) {
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(fieldTypes),
                                 true) // copyInputField
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 58f6711..974cddd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -34,7 +34,6 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGenUtils;
@@ -153,8 +152,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
                 translateOrder(inputTransform, inputRowType, config);
 
         final Tuple2<Pattern<RowData, RowData>, List<String>> cepPatternAndNames =
-                translatePattern(
-                        matchSpec, config.getTableConfig(), planner.getRelBuilder(), inputRowType);
+                translatePattern(matchSpec, config, planner.getRelBuilder(), inputRowType);
         final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
 
         // TODO remove this once it is supported in CEP library
@@ -188,7 +186,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
                 NFACompiler.compileFactory(cepPattern, false);
         final MatchCodeGenerator generator =
                 new MatchCodeGenerator(
-                        new CodeGeneratorContext(config.getTableConfig()),
+                        new CodeGeneratorContext(config),
                         planner.getRelBuilder(),
                         false, // nullableInput
                         JavaScalaConversionUtil.toScala(cepPatternAndNames.f1),
@@ -258,7 +256,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         if (orderKeys.getFieldIndices().length > 1) {
             GeneratedRecordComparator rowComparator =
                     ComparatorCodeGenerator.gen(
-                            config.getTableConfig(), "RowDataComparator", inputRowType, orderKeys);
+                            config, "RowDataComparator", inputRowType, orderKeys);
             return new RowDataEventComparator(rowComparator);
         } else {
             return null;
@@ -300,11 +298,11 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
     @VisibleForTesting
     public static Tuple2<Pattern<RowData, RowData>, List<String>> translatePattern(
             MatchSpec matchSpec,
-            TableConfig tableConfig,
+            ReadableConfig config,
             RelBuilder relBuilder,
             RowType inputRowType) {
         final PatternVisitor patternVisitor =
-                new PatternVisitor(tableConfig, relBuilder, inputRowType, matchSpec);
+                new PatternVisitor(config, relBuilder, inputRowType, matchSpec);
 
         final Pattern<RowData, RowData> cepPattern;
         if (matchSpec.getInterval().isPresent()) {
@@ -329,7 +327,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
 
     /** The visitor to traverse the pattern RexNode. */
     private static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
-        private final TableConfig tableConfig;
+        private final ReadableConfig config;
         private final RelBuilder relBuilder;
         private final RowType inputRowType;
         private final MatchSpec matchSpec;
@@ -337,11 +335,11 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         private Pattern<RowData, RowData> pattern;
 
         public PatternVisitor(
-                TableConfig tableConfig,
+                ReadableConfig config,
                 RelBuilder relBuilder,
                 RowType inputRowType,
                 MatchSpec matchSpec) {
-            this.tableConfig = tableConfig;
+            this.config = config;
             this.relBuilder = relBuilder;
             this.inputRowType = inputRowType;
             this.matchSpec = matchSpec;
@@ -357,7 +355,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
             if (patternDefinition != null) {
                 MatchCodeGenerator generator =
                         new MatchCodeGenerator(
-                                new CodeGeneratorContext(tableConfig),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 false, // nullableInput
                                 JavaScalaConversionUtil.toScala(new ArrayList<>(names)),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
index 0a189f98..41f2b3f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
@@ -184,7 +184,7 @@ public class StreamExecOverAggregate extends ExecNodeBase<RowData>
                 RowType.of(
                         fieldTypes.toArray(new LogicalType[0]), fieldNames.toArray(new String[0]));
 
-        final CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
+        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
         final KeyedProcessFunction<RowData, RowData, RowData> overProcessFunction;
         if (group.getLowerBound().isPreceding()
                 && group.getLowerBound().isUnbounded()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index ea153dd..39c8412 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -484,7 +484,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                     namedWindowProperties,
                     shiftTimeZone,
                     ProjectionCodeGenerator.generateProjection(
-                            CodeGeneratorContext.apply(config.getTableConfig()),
+                            CodeGeneratorContext.apply(config),
                             "UdafInputProjection",
                             inputRowType,
                             userDefinedFunctionInputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index efc844b..eb27198 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -266,7 +266,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                                 .project(outputRowType);
         GeneratedProjection generatedProjection =
                 ProjectionCodeGenerator.generateProjection(
-                        CodeGeneratorContext.apply(config.getTableConfig()),
+                        CodeGeneratorContext.apply(config),
                         "UdafInputProjection",
                         inputRowType,
                         userDefinedFunctionInputType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
index bcb3f9c..d337435 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
@@ -204,7 +204,7 @@ public class StreamExecRank extends ExecNodeBase<RowData>
         SortSpec sortSpecInSortKey = builder.build();
         GeneratedRecordComparator sortKeyComparator =
                 ComparatorCodeGenerator.gen(
-                        config.getTableConfig(),
+                        config,
                         "StreamExecSortComparator",
                         RowType.of(sortSpec.getFieldTypes(inputType)),
                         sortSpecInSortKey);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
index 6d5424c..53e2b8f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
@@ -77,7 +77,7 @@ public class StreamExecSort extends ExecNodeBase<RowData> implements StreamExecN
         // sort code gen
         GeneratedRecordComparator rowComparator =
                 ComparatorCodeGenerator.gen(
-                        config.getTableConfig(), "StreamExecSortComparator", inputType, sortSpec);
+                        config, "StreamExecSortComparator", inputType, sortSpec);
         StreamSortOperator sortOperator =
                 new StreamSortOperator(InternalTypeInfo.of(inputType), rowComparator);
         Transformation<RowData> inputTransform =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
index 11125be..c1aea35 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
@@ -210,7 +210,7 @@ public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
 
         // input must not be nullable, because the runtime join function will make sure
         // the code-generated function won't process null inputs
-        final CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
+        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
         final ExprCodeGenerator exprGenerator =
                 new ExprCodeGenerator(ctx, false)
                         .bindInput(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
index ed5d49c..29d76cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
@@ -137,10 +137,7 @@ public class StreamExecTemporalSort extends ExecNodeBase<RowData>
 
             GeneratedRecordComparator rowComparator =
                     ComparatorCodeGenerator.gen(
-                            config.getTableConfig(),
-                            "ProcTimeSortComparator",
-                            inputType,
-                            specExcludeTime);
+                            config, "ProcTimeSortComparator", inputType, specExcludeTime);
             ProcTimeSortOperator sortOperator =
                     new ProcTimeSortOperator(InternalTypeInfo.of(inputType), rowComparator);
 
@@ -177,10 +174,7 @@ public class StreamExecTemporalSort extends ExecNodeBase<RowData>
             SortSpec specExcludeTime = sortSpec.createSubSortSpec(1);
             rowComparator =
                     ComparatorCodeGenerator.gen(
-                            config.getTableConfig(),
-                            "RowTimeSortComparator",
-                            inputType,
-                            specExcludeTime);
+                            config, "RowTimeSortComparator", inputType, specExcludeTime);
         }
         RowTimeSortOperator sortOperator =
                 new RowTimeSortOperator(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index a27c3ff..a210588 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -114,7 +114,7 @@ public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData>
 
         final GeneratedWatermarkGenerator watermarkGenerator =
                 WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
-                        config.getTableConfig(),
+                        config,
                         (RowType) inputEdge.getOutputType(),
                         watermarkExpr,
                         JavaScalaConversionUtil.toScala(Optional.empty()));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 094c70c..f8ede56 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -213,7 +213,7 @@ public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase {
             ZoneId shiftTimeZone) {
         final AggsHandlerCodeGenerator generator =
                 new AggsHandlerCodeGenerator(
-                                new CodeGeneratorContext(config.getTableConfig()),
+                                new CodeGeneratorContext(config),
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(fieldTypes),
                                 false) // copyInputField
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
index e87cb3f..030a85d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
@@ -156,8 +156,7 @@ public class StreamExecWindowJoin extends ExecNodeBase<RowData>
         final InternalTypeInfo<RowData> rightTypeInfo = InternalTypeInfo.of(rightType);
 
         GeneratedJoinCondition generatedCondition =
-                JoinUtil.generateConditionFunction(
-                        config.getTableConfig(), joinSpec, leftType, rightType);
+                JoinUtil.generateConditionFunction(config, joinSpec, leftType, rightType);
 
         ZoneId shiftTimeZone =
                 TimeWindowUtil.getShiftTimeZone(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
index 73f752c..3e0aa81 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
@@ -221,7 +221,7 @@ public class StreamExecWindowRank extends ExecNodeBase<RowData>
                         windowing.getTimeAttributeType(), config.getLocalTimeZone());
         GeneratedRecordComparator sortKeyComparator =
                 ComparatorCodeGenerator.gen(
-                        config.getTableConfig(),
+                        config,
                         "StreamExecSortComparator",
                         RowType.of(sortSpec.getFieldTypes(inputType)),
                         sortSpecInSortKey);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
index 712522c..11cf450 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
-import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
@@ -56,7 +56,7 @@ public class KeySelectorUtil {
             RowType inputType = rowType.toRowType();
             GeneratedProjection generatedProjection =
                     ProjectionCodeGenerator.generateProjection(
-                            CodeGeneratorContext.apply(new TableConfig()),
+                            CodeGeneratorContext.apply(new Configuration()),
                             "KeyProjection",
                             inputType,
                             returnType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java
index 592dcf8..ed5d476 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java
@@ -18,14 +18,18 @@
 
 package org.apache.flink.table.planner.utils;
 
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.calcite.CalciteConfig;
 import org.apache.flink.table.planner.calcite.CalciteConfig$;
 import org.apache.flink.table.planner.plan.utils.OperatorType;
 
+import java.time.ZoneId;
 import java.util.HashSet;
 import java.util.Set;
 
+import static java.time.ZoneId.SHORT_IDS;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS;
 import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY;
 
@@ -90,6 +94,35 @@ public class TableConfigUtils {
                 .orElse(CalciteConfig$.MODULE$.DEFAULT());
     }
 
+    /**
+     * Similar to {@link TableConfig#getLocalTimeZone()} but extracting it from a generic {@link
+     * ReadableConfig}.
+     *
+     * @see TableConfig#getLocalTimeZone()
+     */
+    public static ZoneId getLocalTimeZone(ReadableConfig tableConfig) {
+        String zone = tableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+        validateTimeZone(zone);
+        return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                ? ZoneId.systemDefault()
+                : ZoneId.of(zone);
+    }
+
+    /** Validates user configured time zone. */
+    private static void validateTimeZone(String zone) {
+        final String zoneId = zone.toUpperCase();
+        if (zoneId.startsWith("UTC+")
+                || zoneId.startsWith("UTC-")
+                || SHORT_IDS.containsKey(zoneId)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The supported Zone ID is either a full name such as "
+                                    + "'America/Los_Angeles', or a custom timezone id such as "
+                                    + "'GMT-08:00', but configured Zone ID is '%s'.",
+                            zone));
+        }
+    }
+
     // Make sure that we cannot instantiate this class
     private TableConfigUtils() {}
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 2b5e9c5..cab7fd8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
+import org.apache.flink.configuration.ReadableConfig
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.data.{BoxedWrapperRowData, RowData}
 import org.apache.flink.table.functions.FunctionKind
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
@@ -74,7 +75,7 @@ object CalcCodeGenerator {
       outRowClass: Class[_ <: RowData],
       calcProjection: Seq[RexNode],
       calcCondition: Option[RexNode],
-      tableConfig: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
+      tableConfig: ReadableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
     val ctx = CodeGeneratorContext(tableConfig)
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     val collectorTerm = CodeGenUtils.DEFAULT_COLLECTOR_TERM
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index c5f6f14..41180a7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -20,25 +20,25 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.{Function, RuntimeContext}
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.data.conversion.{DataStructureConverter, DataStructureConverters}
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.GenerateUtils.generateRecordStatement
-import org.apache.flink.table.planner.utils.InternalConfigOptions
-import org.apache.flink.table.utils.DateTimeUtils
+import org.apache.flink.table.planner.utils.{InternalConfigOptions, TableConfigUtils}
 import org.apache.flink.table.runtime.operators.TableStreamOperator
 import org.apache.flink.table.runtime.typeutils.{ExternalSerializer, InternalSerializers}
 import org.apache.flink.table.runtime.util.collections._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
+import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.util.InstantiationUtil
 
+import java.time.ZoneId
 import java.util.TimeZone
 import java.util.function.{Supplier => JSupplier}
-import java.time.ZoneId
 
 import scala.collection.mutable
 
@@ -46,7 +46,7 @@ import scala.collection.mutable
   * The context for code generator, maintaining various reusable statements that could be insert
   * into different code sections in the final generated class.
   */
-class CodeGeneratorContext(val tableConfig: TableConfig) {
+class CodeGeneratorContext(val tableConfig: ReadableConfig) {
 
   // holding a list of objects that could be used passed into generated class
   val references: mutable.ArrayBuffer[AnyRef] = new mutable.ArrayBuffer[AnyRef]()
@@ -486,7 +486,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   def addReusableQueryLevelCurrentTimestamp(): String = {
     val fieldTerm = s"queryStartTimestamp"
 
-    val queryStartEpoch = tableConfig.getConfiguration
+    val queryStartEpoch = tableConfig
       .getOptional(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME)
       .orElseThrow(
         new JSupplier[Throwable] {
@@ -541,7 +541,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   def addReusableQueryLevelLocalDateTime(): String = {
     val fieldTerm = s"queryStartLocaltimestamp"
 
-    val queryStartLocalTimestamp = tableConfig.getConfiguration
+    val queryStartLocalTimestamp = tableConfig
       .getOptional(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME)
       .orElseThrow(
         new JSupplier[Throwable] {
@@ -638,7 +638,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     * Adds a reusable TimeZone to the member area of the generated class.
     */
   def addReusableSessionTimeZone(): String = {
-    val zoneID = TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getID
+    val zoneID = TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getID
     val stmt =
       s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM =
          |                 java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin
@@ -977,7 +977,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 }
 
 object CodeGeneratorContext {
-  def apply(tableConfig: TableConfig): CodeGeneratorContext = {
+  def apply(tableConfig: ReadableConfig): CodeGeneratorContext = {
     new CodeGeneratorContext(tableConfig)
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
index 03b5e6e..f04772b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
@@ -94,7 +94,7 @@ object CollectorCodeGenerator {
     """.stripMargin
 
     new GeneratedCollector(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -157,7 +157,7 @@ object CollectorCodeGenerator {
     """.stripMargin
 
     new GeneratedCollector(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   def addToContext(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 9b3bcc9..de5c6a6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
+import org.apache.flink.configuration.ReadableConfig
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.functions.FunctionKind
@@ -40,7 +41,7 @@ import org.apache.calcite.rex._
 object CorrelateCodeGenerator {
 
   def generateCorrelateTransformation(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       operatorCtx: CodeGeneratorContext,
       inputTransformation: Transformation[RowData],
       inputType: RowType,
@@ -97,7 +98,7 @@ object CorrelateCodeGenerator {
     */
   private[flink] def generateOperator[T <: Function](
       ctx: CodeGeneratorContext,
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       inputType: RowType,
       condition: Option[RexNode],
       returnType: RowType,
@@ -181,7 +182,7 @@ object CorrelateCodeGenerator {
    */
   private def generateCorrelateCollector(
       ctx: CodeGeneratorContext,
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       inputType: RowType,
       functionResultType: RowType,
       resultType: RowType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
index c316c6a..78f2c53 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens.generateEquals
@@ -42,7 +42,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
 
   def generateRecordEqualiser(name: String): GeneratedRecordEqualiser = {
     // ignore time zone
-    val ctx = CodeGeneratorContext(new TableConfig)
+    val ctx = CodeGeneratorContext(new Configuration)
     val className = newName(name)
 
     val equalsMethodCodes = for (idx <- fieldTypes.indices) yield generateEqualsMethod(ctx, idx)
@@ -80,7 +80,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
       """.stripMargin
 
     new GeneratedRecordEqualiser(
-      className, classCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, classCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def getEqualsMethodName(idx: Int) = s"""equalsAtIndex$idx"""
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 38386e9..bb9b852 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
-import org.apache.flink.table.data.util.DataFormatConverters.{getConverterForDataType, DataFormatConverter}
+import org.apache.flink.table.data.util.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexDistinctKeyVariable, RexFieldVariable}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
@@ -668,7 +668,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       // casting
       case CAST =>
         generateCast(ctx, operands.head, resultType, nullOnFailure = ctx.tableConfig
-          .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled)
+          .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled)
 
       case TRY_CAST =>
         generateCast(ctx, operands.head, resultType, nullOnFailure = true)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 33fe3e3..f6dc9c3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil}
 import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData}
@@ -28,9 +29,9 @@ import org.apache.flink.table.planner.codegen.FunctionCodeGenerator.generateFunc
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.{JSON_ARRAY, JSON_OBJECT}
 import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.flink.table.planner.utils.Logging
+import org.apache.flink.table.planner.utils.TimestampStringUtils.fromLocalDateTime
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.RowType
-import org.apache.flink.table.planner.utils.TimestampStringUtils.fromLocalDateTime
 
 import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rex.{RexBuilder, RexCall, RexExecutor, RexLiteral, RexNode, RexUtil}
@@ -71,7 +72,7 @@ class ExpressionReducer(
     val resultType = RowType.of(literalTypes: _*)
 
     // generate MapFunction
-    val ctx = new ConstantCodeGeneratorContext(tableConfig)
+    val ctx = new ConstantCodeGeneratorContext(tableConfig.getConfiguration)
 
     val exprGenerator = new ExprCodeGenerator(ctx, false)
       .bindInput(EMPTY_ROW_TYPE)
@@ -292,7 +293,7 @@ class ExpressionReducer(
 /**
   * Constant expression code generator context.
   */
-class ConstantCodeGeneratorContext(tableConfig: TableConfig)
+class ConstantCodeGeneratorContext(tableConfig: ReadableConfig)
   extends CodeGeneratorContext(tableConfig) {
   override def addReusableFunction(
       function: UserDefinedFunction,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 4038daa..134250b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -161,7 +161,7 @@ object FunctionCodeGenerator {
     """.stripMargin
 
     new GeneratedFunction(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -218,6 +218,6 @@ object FunctionCodeGenerator {
      """.stripMargin
 
     new GeneratedJoinCondition(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
index a925e02..433e2b3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
@@ -80,7 +80,7 @@ object HashCodeGenerator {
     """.stripMargin
 
     new GeneratedHashFunction(
-      className, code, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, code, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def generateCodeBody(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
index 80f677c..9358c5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
@@ -88,7 +88,7 @@ object InputFormatCodeGenerator {
     """.stripMargin
 
     new GeneratedInput(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 637260e..be935a9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -18,9 +18,8 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.metrics.Gauge
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.data.{RowData, TimestampData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
@@ -30,7 +29,7 @@ import org.apache.flink.table.runtime.hashtable.{LongHashPartition, LongHybridHa
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.operators.join.HashJoinType
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{TIMESTAMP_WITHOUT_TIME_ZONE, _}
+import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
 
 /**
@@ -92,7 +91,7 @@ object LongHashJoinGenerator {
      """.stripMargin, anyNullTerm)
   }
 
-  def genProjection(tableConfig: TableConfig, types: Array[LogicalType]): GeneratedProjection = {
+  def genProjection(tableConfig: ReadableConfig, types: Array[LogicalType]): GeneratedProjection = {
     val rowType = RowType.of(types: _*)
     ProjectionCodeGenerator.generateProjection(
       CodeGeneratorContext.apply(tableConfig),
@@ -103,7 +102,7 @@ object LongHashJoinGenerator {
   }
 
   def gen(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       hashJoinType: HashJoinType,
       keyType: RowType,
       buildType: RowType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 5268a4b..92b5490 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -18,9 +18,9 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.streaming.api.functions.async.AsyncFunction
-import org.apache.flink.table.api.{TableConfig, ValidationException}
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.catalog.DataTypeFactory
 import org.apache.flink.table.connector.source.{LookupTableSource, ScanTableSource}
 import org.apache.flink.table.data.utils.JoinedRowData
@@ -65,7 +65,7 @@ object LookupJoinCodeGenerator {
     * Generates a lookup function ([[TableFunction]])
     */
   def generateSyncLookupFunction(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -105,7 +105,7 @@ object LookupJoinCodeGenerator {
     * Generates a async lookup function ([[AsyncTableFunction]])
     */
   def generateAsyncLookupFunction(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -134,7 +134,7 @@ object LookupJoinCodeGenerator {
 
   private def generateLookupFunction[F <: Function](
       generatedClass: Class[F],
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -162,7 +162,7 @@ object LookupJoinCodeGenerator {
       lookupFunction,
       callContext,
       classOf[PlannerBase].getClassLoader,
-      tableConfig.getConfiguration)
+      tableConfig)
 
     val inference = createLookupTypeInference(
       dataTypeFactory,
@@ -408,7 +408,7 @@ object LookupJoinCodeGenerator {
     """.stripMargin
 
     new GeneratedCollector(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -423,7 +423,7 @@ object LookupJoinCodeGenerator {
     * @return instance of GeneratedCollector
     */
   def generateTableAsyncCollector(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       name: String,
       leftInputType: RowType,
       collectedType: RowType,
@@ -499,7 +499,7 @@ object LookupJoinCodeGenerator {
     """.stripMargin
 
     new GeneratedResultFuture(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -507,7 +507,7 @@ object LookupJoinCodeGenerator {
     * to projection/filter the dimension table results
     */
   def generateCalcMapFunction(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       projection: Seq[RexNode],
       condition: RexNode,
       outputType: RelDataType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
index 71a5493..30a48f1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.Function
 import org.apache.flink.cep.functions.PatternProcessFunction
 import org.apache.flink.cep.pattern.conditions.{IterativeCondition, RichIterativeCondition}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
@@ -293,7 +293,7 @@ class MatchCodeGenerator(
       """.stripMargin
 
     new GeneratedFunction[F](
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def generateOneRowPerMatchExpression(
@@ -689,7 +689,7 @@ class MatchCodeGenerator(
         .map(expr => FlinkTypeFactory.toLogicalType(expr.getType))
 
       val aggsHandlerCodeGenerator = new AggsHandlerCodeGenerator(
-        CodeGeneratorContext(new TableConfig),
+        CodeGeneratorContext(new Configuration),
         relBuilder,
         inputFieldTypes,
         copyInputField = false).needAccumulate()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index 81fa6a0..bc5e06a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -122,7 +122,7 @@ object OperatorCodeGenerator extends Logging {
     LOG.debug(s"Compiling OneInputStreamOperator Code:\n$name")
     LOG.trace(s"Code: \n$operatorCode")
     new GeneratedOperator(
-      operatorName, operatorCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      operatorName, operatorCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   def generateTwoInputStreamOperator[IN1 <: Any, IN2 <: Any, OUT <: Any](
@@ -251,7 +251,7 @@ object OperatorCodeGenerator extends Logging {
     LOG.debug(s"Compiling TwoInputStreamOperator Code:\n$name")
     LOG.trace(s"Code: \n$operatorCode")
     new GeneratedOperator(
-      operatorName, operatorCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      operatorName, operatorCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def generateInputTerm(inputTypeTerm: String): String = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
index 5642705..f7b7abb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
@@ -118,7 +118,7 @@ object ProjectionCodeGenerator {
         """.stripMargin
 
     new GeneratedProjection(
-      className, code, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, code, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
index aabb7fd..4436b2c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.runtime.operators.values.ValuesInputFormat
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
@@ -33,7 +33,7 @@ import scala.collection.JavaConversions._
 object ValuesCodeGenerator {
 
   def generatorInputFormat(
-    tableConfig: TableConfig,
+    tableConfig: ReadableConfig,
     outputType: RowType,
     tuples: util.List[util.List[RexLiteral]],
     description: String): ValuesInputFormat = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index c1b83c6..b9d1f7f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -20,16 +20,16 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier
 import org.apache.flink.api.common.externalresource.ExternalResourceInfo
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.metrics.MetricGroup
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, newName}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.runtime.generated.{GeneratedWatermarkGenerator, WatermarkGenerator}
-import org.apache.flink.table.types.logical.{LogicalTypeRoot, RowType}
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.{LogicalTypeRoot, RowType}
 
 import org.apache.calcite.rex.RexNode
 
@@ -42,7 +42,7 @@ import java.util
 object WatermarkGeneratorCodeGenerator {
 
   def generateWatermarkGenerator(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       inputType: RowType,
       watermarkExpr: RexNode,
       contextTerm: Option[String] = None): GeneratedWatermarkGenerator = {
@@ -119,12 +119,12 @@ object WatermarkGeneratorCodeGenerator {
     """.stripMargin
 
     new GeneratedWatermarkGenerator(
-      funcName, funcCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      funcName, funcCode, ctx.references.toArray, ctx.tableConfig)
   }
 }
 
 class WatermarkGeneratorFunctionContext(
-    tableConfig: TableConfig,
+    tableConfig: ReadableConfig,
     contextTerm: String = "parameters") extends CodeGeneratorContext(tableConfig) {
 
   override def addReusableFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index bbb7f4f..f58f725 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -416,7 +416,7 @@ class AggsHandlerCodeGenerator(
       """.stripMargin
 
     new GeneratedAggsHandleFunction(
-      functionName, functionCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      functionName, functionCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -568,7 +568,7 @@ class AggsHandlerCodeGenerator(
       """.stripMargin
 
     new GeneratedTableAggsHandleFunction(
-      functionName, functionCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      functionName, functionCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -695,7 +695,7 @@ class AggsHandlerCodeGenerator(
       """.stripMargin
 
     new GeneratedNamespaceAggsHandleFunction[N](
-      functionName, functionCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      functionName, functionCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   /**
@@ -848,7 +848,7 @@ class AggsHandlerCodeGenerator(
       """.stripMargin
 
     new GeneratedNamespaceTableAggsHandleFunction[N](
-      functionName, functionCode, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      functionName, functionCode, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def genCreateAccumulators(): String = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
index e15d655..83d3b7c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
@@ -67,7 +67,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends CallGenerator {
       definition,
       callContext,
       classOf[PlannerBase].getClassLoader,
-      ctx.tableConfig.getConfiguration)
+      ctx.tableConfig)
     val inference = udf.getTypeInference(dataTypeFactory)
 
     generateFunctionAwareCall(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 44c0b28..4bf8432 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -18,19 +18,20 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.calcite.sql.SqlOperator
 import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.configuration.ExecutionOptions
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.{ExecutionOptions, ReadableConfig}
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isPrimitive
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
 
+import org.apache.calcite.sql.SqlOperator
+
 import java.lang.reflect.Method
+
 import scala.collection.mutable
 
-class FunctionGenerator private(tableConfig: TableConfig) {
+class FunctionGenerator private(tableConfig: ReadableConfig) {
 
   val INTEGRAL_TYPES = Array(
     TINYINT,
@@ -44,7 +45,7 @@ class FunctionGenerator private(tableConfig: TableConfig) {
     mutable.Map()
 
   val isStreamingMode = RuntimeExecutionMode.STREAMING.equals(
-    tableConfig.getConfiguration.get(ExecutionOptions.RUNTIME_MODE))
+    tableConfig.get(ExecutionOptions.RUNTIME_MODE))
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------
@@ -942,6 +943,6 @@ class FunctionGenerator private(tableConfig: TableConfig) {
 }
 
 object FunctionGenerator {
-    def getInstance(tableConfig: TableConfig): FunctionGenerator =
+    def getInstance(tableConfig: ReadableConfig): FunctionGenerator =
       new FunctionGenerator(tableConfig)
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 020708d..914127f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL,
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.functions.casting.{CastRule, CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable, isPrimitive}
@@ -831,7 +832,7 @@ object ScalarOperatorGens {
     ctx.addReusableHeaderComment(
       s"Using option '${ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR.key()}':" +
         s"'${isLegacyCastBehaviourEnabled(ctx)}'")
-    ctx.addReusableHeaderComment("Timezone: " + ctx.tableConfig.getLocalTimeZone)
+    ctx.addReusableHeaderComment("Timezone: " + ctx.tableConfig)
 
     // Try to use the new cast rules
     val rule = CastRuleProvider.resolve(operand.resultType, targetType)
@@ -1778,14 +1779,13 @@ object ScalarOperatorGens {
     new CastRule.Context {
       override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx)
 
-      override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone
+      override def getSessionZoneId: ZoneId = TableConfigUtils.getLocalTimeZone(ctx.tableConfig)
 
       override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
     }
   }
 
   private def isLegacyCastBehaviourEnabled(ctx: CodeGeneratorContext) = {
-    ctx.tableConfig
-      .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled
+    ctx.tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
index 35b31ec..a00daaf 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.over
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, newName}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext, GenerateUtils}
@@ -30,7 +30,7 @@ import org.apache.flink.table.types.logical.RowType
   * RANGE allow the compound ORDER BY and the random type when the bound is current row.
   */
 class MultiFieldRangeBoundComparatorCodeGenerator(
-    tableConfig: TableConfig,
+    tableConfig: ReadableConfig,
     inputType: RowType,
     sortSpec: SortSpec,
     isLowerBound: Boolean = true) {
@@ -76,7 +76,7 @@ class MultiFieldRangeBoundComparatorCodeGenerator(
       """.stripMargin
 
     new GeneratedRecordComparator(
-      className, code, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, code, ctx.references.toArray, ctx.tableConfig)
   }
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
index 011f56e..3915acd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.over
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, newName}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
@@ -46,7 +46,7 @@ import java.math.BigDecimal
   */
 class RangeBoundComparatorCodeGenerator(
     relBuilder: RelBuilder,
-    tableConfig: TableConfig,
+    tableConfig: ReadableConfig,
     inputType: RowType,
     bound: Any,
     key: Int = -1,
@@ -123,7 +123,7 @@ class RangeBoundComparatorCodeGenerator(
       """.stripMargin
 
     new GeneratedRecordComparator(
-      className, code, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, code, ctx.references.toArray, ctx.tableConfig)
   }
 
   private def getComparatorCode(inputValue: String, currentValue: String): String = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
index 0829073..ee0d81a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.sort
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, newName}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GenerateUtils}
@@ -42,7 +42,7 @@ object ComparatorCodeGenerator {
     * @return A GeneratedRecordComparator
     */
   def gen(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       name: String,
       inputType: RowType,
       sortSpec: SortSpec): GeneratedRecordComparator = {
@@ -75,7 +75,7 @@ object ComparatorCodeGenerator {
       """.stripMargin
 
     new GeneratedRecordComparator(
-      className, code, ctx.references.toArray, ctx.tableConfig.getConfiguration)
+      className, code, ctx.references.toArray, ctx.tableConfig)
   }
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index 28e279e..5c49d32 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.sort
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.{DecimalData, TimestampData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, SEGMENT, newName}
@@ -40,7 +40,7 @@ import scala.collection.mutable
   * @param sortSpec     sort specification.
   */
 class SortCodeGenerator(
-    tableConfig: TableConfig,
+    tableConfig: ReadableConfig,
     val input: RowType,
     val sortSpec: SortSpec) {
 
@@ -183,7 +183,7 @@ class SortCodeGenerator(
       }
     """.stripMargin
 
-    new GeneratedNormalizedKeyComputer(className, code, tableConfig.getConfiguration)
+    new GeneratedNormalizedKeyComputer(className, code, tableConfig)
   }
 
   def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
index 37c7201..afb9e00 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
@@ -51,7 +51,7 @@ abstract class BatchPhysicalJoinBase(
       tableConfig: TableConfig,
       leftType: RowType,
       rightType: RowType): GeneratedJoinCondition = {
-    val ctx = CodeGeneratorContext(tableConfig)
+    val ctx = CodeGeneratorContext(tableConfig.getConfiguration)
     val exprGenerator = new ExprCodeGenerator(ctx, false)
         .bindInput(leftType)
         .bindSecondInput(rightType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
index 347ac1f..b3522ad 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, PartitionPruner, RexNodeExtractor, RexNodeToExpressionConverter}
-import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter
+import org.apache.flink.table.planner.utils.{CatalogTableStatisticsConverter, TableConfigUtils}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.sources.PartitionableTableSource
 
@@ -71,7 +71,7 @@ class PushPartitionIntoLegacyTableSourceScanRule extends RelOptRule(
     val filter: Filter = call.rel(0)
     val scan: LogicalTableScan = call.rel(1)
     val context = call.getPlanner.getContext.unwrap(classOf[FlinkContext])
-    val config = context.getTableConfig
+    val tableConfig = context.getTableConfig
     val tableSourceTable = scan.getTable.unwrap(classOf[LegacyTableSourceTable[_]])
     val tableIdentifier = tableSourceTable.tableIdentifier
     val catalogOption = toScala(context.getCatalogManager.getCatalog(
@@ -132,7 +132,7 @@ class PushPartitionIntoLegacyTableSourceScanRule extends RelOptRule(
         partitionPredicate
       )
       PartitionPruner.prunePartitions(
-        config,
+        tableConfig,
         partitionFieldNames,
         partitionFieldTypes,
         allPartitions,
@@ -150,7 +150,7 @@ class PushPartitionIntoLegacyTableSourceScanRule extends RelOptRule(
               inputFields,
               context.getFunctionCatalog,
               context.getCatalogManager,
-              TimeZone.getTimeZone(config.getLocalTimeZone))
+              TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig.getConfiguration)))
             def toExpressions: Option[Seq[Expression]] = {
               val expressions = new mutable.ArrayBuffer[Expression]()
               for (predicate <- partitionPredicates) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
index dff51b9..5c99db2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.planner.plan.utils
 
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.configuration.ReadableConfig
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
@@ -29,15 +30,15 @@ import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.satisfyTempora
 import org.apache.flink.table.planner.plan.utils.WindowJoinUtil.satisfyWindowJoin
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
 import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexUtil}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexUtil}
 import org.apache.calcite.sql.validate.SqlValidatorUtil
 import org.apache.calcite.util.ImmutableIntList
 
@@ -124,7 +125,7 @@ object JoinUtil {
   }
 
   def generateConditionFunction(
-      tableConfig: TableConfig,
+      tableConfig: ReadableConfig,
       joinSpec: JoinSpec,
       leftType: LogicalType,
       rightType: LogicalType): GeneratedJoinCondition = {
@@ -136,7 +137,7 @@ object JoinUtil {
   }
 
   def generateConditionFunction(
-        tableConfig: TableConfig,
+        tableConfig: ReadableConfig,
         nonEquiCondition: RexNode,
         leftType: LogicalType,
         rightType: LogicalType): GeneratedJoinCondition = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
index 9f1c47b..73a21ee 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
@@ -25,11 +25,11 @@ import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.data.{DecimalDataUtils, GenericRowData, StringData, TimestampData}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.DEFAULT_COLLECTOR_TERM
 import org.apache.flink.table.planner.codegen.{ConstantCodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
-import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.{BooleanType, DecimalType, LogicalType}
+import org.apache.flink.table.utils.DateTimeUtils
 
 import org.apache.calcite.rex.RexNode
 
@@ -87,7 +87,7 @@ object PartitionPruner {
     val inputType = InternalTypeInfo.ofFields(partitionFieldTypes, partitionFieldNames).toRowType
     val returnType: LogicalType = new BooleanType(false)
 
-    val ctx = new ConstantCodeGeneratorContext(tableConfig)
+    val ctx = new ConstantCodeGeneratorContext(tableConfig.getConfiguration)
     val collectorTerm = DEFAULT_COLLECTOR_TERM
 
     val exprGenerator = new ExprCodeGenerator(ctx, false)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
index 1650465..8b7f3b4 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.planner.codegen;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.testutils.FlinkMatchers;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -92,7 +93,7 @@ public class CodeSplitTest {
             rowData1.setField(random.nextInt(numFields), 1);
         }
 
-        Consumer<TableConfig> consumer =
+        Consumer<ReadableConfig> consumer =
                 tableConfig -> {
                     JoinCondition instance =
                             JoinUtil.generateConditionFunction(
@@ -120,7 +121,7 @@ public class CodeSplitTest {
             rowData.setField(i, i);
         }
 
-        Consumer<TableConfig> consumer =
+        Consumer<ReadableConfig> consumer =
                 tableConfig -> {
                     HashFunction instance =
                             HashCodeGenerator.generateRowHash(
@@ -162,7 +163,7 @@ public class CodeSplitTest {
             rowData1.setField(random.nextInt(numFields), 100);
         }
 
-        Consumer<TableConfig> consumer =
+        Consumer<ReadableConfig> consumer =
                 tableConfig -> {
                     RecordComparator instance =
                             ComparatorCodeGenerator.gen(tableConfig, "", rowType, sortSpec)
@@ -197,7 +198,7 @@ public class CodeSplitTest {
         }
         outputWriter.complete();
 
-        Consumer<TableConfig> consumer =
+        Consumer<ReadableConfig> consumer =
                 tableConfig -> {
                     Projection instance =
                             ProjectionCodeGenerator.generateProjection(
@@ -220,13 +221,13 @@ public class CodeSplitTest {
         return RowType.of(fieldTypes);
     }
 
-    private void runTest(Consumer<TableConfig> consumer) {
-        TableConfig splitTableConfig = new TableConfig();
+    private void runTest(Consumer<ReadableConfig> consumer) {
+        Configuration splitTableConfig = new Configuration();
         splitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000);
         splitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000);
         consumer.accept(splitTableConfig);
 
-        TableConfig noSplitTableConfig = new TableConfig();
+        Configuration noSplitTableConfig = new Configuration();
         noSplitTableConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Integer.MAX_VALUE);
         noSplitTableConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Integer.MAX_VALUE);
         PrintStream originalStdOut = System.out;
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongHashJoinGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongHashJoinGeneratorTest.java
index 65cd67c..f9c8437 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongHashJoinGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongHashJoinGeneratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.codegen;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.generated.JoinCondition;
@@ -40,7 +40,7 @@ public class LongHashJoinGeneratorTest extends Int2HashJoinOperatorTest {
         RowType keyType = RowType.of(new IntType());
         assertThat(LongHashJoinGenerator.support(type, keyType, new boolean[] {true})).isTrue();
         return LongHashJoinGenerator.gen(
-                new TableConfig(),
+                new Configuration(),
                 type,
                 keyType,
                 RowType.of(new IntType(), new IntType()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index 2c4f0eb..4f31594 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RawValueData;
@@ -638,7 +638,8 @@ public class SortCodeGeneratorTest {
 
     public static Tuple2<NormalizedKeyComputer, RecordComparator> getSortBaseWithNulls(
             String namePrefix, RowType inputType, SortSpec sortSpec) {
-        SortCodeGenerator generator = new SortCodeGenerator(new TableConfig(), inputType, sortSpec);
+        SortCodeGenerator generator =
+                new SortCodeGenerator(new Configuration(), inputType, sortSpec);
         GeneratedNormalizedKeyComputer computer =
                 generator.generateNormalizedKeyComputer(namePrefix + "Computer");
         GeneratedRecordComparator comparator =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala
index a4b3ad6..d8cc0af 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.types.logical.{BigIntType, IntType, RowType, VarBinaryType}
 
@@ -34,14 +34,14 @@ class HashCodeGeneratorTest {
   @Test
   def testHash(): Unit = {
     val hashFunc1 = HashCodeGenerator.generateRowHash(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       RowType.of(new IntType(), new BigIntType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)),
       "name",
       Array(1, 0)
     ).newInstance(classLoader)
 
     val hashFunc2 = HashCodeGenerator.generateRowHash(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       RowType.of(new IntType(), new BigIntType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)),
       "name",
       Array(1, 2, 0)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGeneratorTest.scala
index 8311bde..b8120f7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGeneratorTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.writer.BinaryRowWriter
 import org.apache.flink.table.data.{DecimalData, GenericRowData, RowData, TimestampData}
@@ -39,7 +39,7 @@ class ProjectionCodeGeneratorTest {
   @Test
   def testProjectionBinaryRow(): Unit = {
     val projection = ProjectionCodeGenerator.generateProjection(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       "name",
       RowType.of(new IntType(), new BigIntType()),
       RowType.of(new BigIntType(), new IntType()),
@@ -53,7 +53,7 @@ class ProjectionCodeGeneratorTest {
   @Test
   def testProjectionGenericRow(): Unit = {
     val projection = ProjectionCodeGenerator.generateProjection(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       "name",
       RowType.of(new IntType(), new BigIntType()),
       RowType.of(new BigIntType(), new IntType()),
@@ -69,7 +69,7 @@ class ProjectionCodeGeneratorTest {
   def testProjectionManyField(): Unit = {
     val rowType = RowType.of((0 until 100).map(_ => new IntType()).toArray: _*)
     val projection = ProjectionCodeGenerator.generateProjection(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       "name",
       rowType,
       rowType,
@@ -87,7 +87,7 @@ class ProjectionCodeGeneratorTest {
   def testProjectionManyFieldGenericRow(): Unit = {
     val rowType = RowType.of((0 until 100).map(_ => new IntType()).toArray: _*)
     val projection = ProjectionCodeGenerator.generateProjection(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       "name",
       rowType,
       rowType,
@@ -105,7 +105,7 @@ class ProjectionCodeGeneratorTest {
   @Test
   def testProjectionBinaryRowWithVariableLengthData(): Unit = {
     val projection = ProjectionCodeGenerator.generateProjection(
-      new CodeGeneratorContext(new TableConfig),
+      new CodeGeneratorContext(new Configuration),
       "name",
       RowType.of(
         new DecimalType(38, 0),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 6609ca6..5003abe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -22,15 +22,13 @@ import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.metrics.MetricGroup
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext
-import org.apache.flink.table.api.{TableConfig, TableSchema}
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier, UnresolvedIdentifier}
 import org.apache.flink.table.data.{GenericRowData, TimestampData}
-import org.apache.flink.table.delegation.Parser
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory}
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
-import org.apache.flink.table.planner.delegation.{ParserImpl, PlannerContext}
-import org.apache.flink.table.planner.parse.CalciteParser
+import org.apache.flink.table.planner.delegation.PlannerContext
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5
 import org.apache.flink.table.runtime.generated.WatermarkGenerator
 import org.apache.flink.table.types.logical.{IntType, TimestampType}
@@ -38,17 +36,15 @@ import org.apache.flink.table.utils.CatalogManagerMocks
 
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.ConventionTraitDef
-import org.apache.calcite.rel.`type`.RelDataType
-
-import java.lang.{Integer => JInt, Long => JLong}
-import java.util
-import java.util.Collections
-import java.util.function.{Function => JFunction, Supplier => JSupplier}
 
 import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.Test
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util
+import java.util.Collections
 
 /**
   * Tests the generated [[WatermarkGenerator]] from [[WatermarkGeneratorCodeGenerator]].
@@ -186,7 +182,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {
 
     if (useDefinedConstructor) {
       val generated = WatermarkGeneratorCodeGenerator
-        .generateWatermarkGenerator(new TableConfig(), rowType, rexNode, Option.apply("context"))
+        .generateWatermarkGenerator(new Configuration, rowType, rexNode, Option.apply("context"))
       val newReferences = generated.getReferences :+
           new WatermarkGeneratorSupplier.Context {
             override def getMetricGroup: MetricGroup = null
@@ -194,7 +190,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {
       generated.newInstance(Thread.currentThread().getContextClassLoader, newReferences)
     } else {
       val generated = WatermarkGeneratorCodeGenerator
-        .generateWatermarkGenerator(new TableConfig(), rowType, rexNode)
+        .generateWatermarkGenerator(new Configuration, rowType, rexNode)
       generated.newInstance(Thread.currentThread().getContextClassLoader)
     }
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
index e3bca00..222d13a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
@@ -33,10 +33,10 @@ import org.apache.flink.table.runtime.context.ExecutionContext
 import org.apache.flink.table.runtime.dataview.DataViewSpec
 import org.apache.flink.table.types.logical._
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.powermock.api.mockito.PowerMockito.{mock, when}
 
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.tools.RelBuilder
-import org.powermock.api.mockito.PowerMockito.{mock, when}
 
 /**
   * Agg test base to mock agg information and etc.
@@ -110,7 +110,7 @@ abstract class AggTestBase(isBatchMode: Boolean) {
 
   val aggInfoList = AggregateInfoList(
     Array(aggInfo1, aggInfo2, aggInfo3), None, countStarInserted = false, Array())
-  val ctx = new CodeGeneratorContext(tEnv.getConfig)
+  val ctx = new CodeGeneratorContext(tEnv.getConfig.getConfiguration)
   val classLoader: ClassLoader = Thread.currentThread().getContextClassLoader
   val context: ExecutionContext = mock(classOf[ExecutionContext])
   when(context, "getRuntimeContext").thenReturn(mock(classOf[RuntimeContext]))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 52fc78b..5732dd7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -365,7 +365,7 @@ abstract class ExpressionTestBase {
 
   private def getCodeGenFunction(rexNodes: List[RexNode]):
     GeneratedFunction[MapFunction[RowData, BinaryRowData]] = {
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(config.getConfiguration)
     val inputType = if (containsLegacyTypes) {
       fromTypeInfoToLogicalType(typeInfo)
     } else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 1b839c4..4fffe60 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.`match`
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api._
@@ -37,13 +38,14 @@ import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.table.types.logical.{IntType, RowType}
 import org.apache.flink.types.Row
 import org.apache.flink.util.TestLogger
+import org.mockito.Mockito.{mock, when}
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.tools.RelBuilder
+
 import org.junit.Assert._
 import org.junit.rules.ExpectedException
 import org.junit.{ComparisonFailure, Rule}
-import org.mockito.Mockito.{mock, when}
 
 abstract class PatternTranslatorTestBase extends TestLogger {
 
@@ -105,7 +107,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
     val dataMatch = optimized.asInstanceOf[StreamPhysicalMatch]
     val p = StreamExecMatch.translatePattern(
       MatchUtil.createMatchSpec(dataMatch.logicalMatch),
-      new TableConfig,
+      new Configuration,
       context._1,
       testTableRowType).f0
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
index 1fbdf3e..53ae7e5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/PartitionPrunerTest.scala
@@ -24,14 +24,14 @@ import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 
 import org.apache.calcite.rex.RexUtil
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName.DATE
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
 import java.math.BigDecimal
-import java.time.{ZoneId, ZoneOffset}
+import java.time.ZoneOffset
 import java.util.{List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
@@ -67,9 +67,9 @@ class PartitionPrunerTest extends RexNodeTestBase {
       Map("amount" -> "200", "name" -> "Test3", "flag" -> "false").asJava
     ).asJava
 
-    val config = new TableConfig
+    val tableConfig = TableConfig.getDefault
     val prunedPartitions = PartitionPruner.prunePartitions(
-      config,
+      tableConfig,
       partitionFieldNames,
       partitionFieldTypes,
       allPartitions,
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
index ec983f9..ed164c7 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link AggsHandleFunction}. */
 public class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {
@@ -32,7 +33,7 @@ public class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFuncti
     }
 
     public GeneratedAggsHandleFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
index 4932144..b434efa 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Collector;
 
 /**
@@ -45,7 +46,7 @@ public class GeneratedCollector<C extends Collector<?>> extends GeneratedClass<C
      * @param conf configuration when generating Collector.
      */
     public GeneratedCollector(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
index 5c99a68..008e5b0 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.generated;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /**
  * Describes a generated {@link Function}.
@@ -45,7 +46,7 @@ public class GeneratedFunction<F extends Function> extends GeneratedClass<F> {
      * @param conf configuration when generating Function.
      */
     public GeneratedFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
index 1896556..72de372 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.generated;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link HashFunction}. */
 public class GeneratedHashFunction extends GeneratedClass<HashFunction> {
@@ -34,7 +34,7 @@ public class GeneratedHashFunction extends GeneratedClass<HashFunction> {
      * @param conf configuration when generating Function.
      */
     public GeneratedHashFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
index 80cb268..048b743 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /**
  * Describes a generated {@link InputFormat}.
@@ -38,7 +38,7 @@ public class GeneratedInput<F extends InputFormat<?, ?>> extends GeneratedClass<
      * @param references referenced objects of the generated Function.
      * @param conf configuration when generating Function.
      */
-    public GeneratedInput(String className, String code, Object[] references, Configuration conf) {
+    public GeneratedInput(String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
index d1c5c01..a202bc7 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link JoinCondition}. */
 public class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
@@ -40,7 +41,7 @@ public class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
      * @param conf configuration when generating JoinCondition.
      */
     public GeneratedJoinCondition(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
index 312331b..49db1b4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link NamespaceAggsHandleFunction}. */
 public class GeneratedNamespaceAggsHandleFunction<N>
@@ -34,7 +35,7 @@ public class GeneratedNamespaceAggsHandleFunction<N>
     }
 
     public GeneratedNamespaceAggsHandleFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
index b596e36..cbdaa25 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link NamespaceTableAggsHandleFunction}. */
 public class GeneratedNamespaceTableAggsHandleFunction<N>
@@ -34,7 +35,7 @@ public class GeneratedNamespaceTableAggsHandleFunction<N>
     }
 
     public GeneratedNamespaceTableAggsHandleFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
index b13398fa..37635ae3 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.generated;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
@@ -39,7 +39,7 @@ public class GeneratedOperator<C extends StreamOperator<?>> extends GeneratedCla
      * @param conf configuration when generating StreamOperator.
      */
     public GeneratedOperator(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
index 20daf9b..5e67e74 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link Projection}. */
 public class GeneratedProjection extends GeneratedClass<Projection> {
@@ -40,7 +41,7 @@ public class GeneratedProjection extends GeneratedClass<Projection> {
      * @param conf configuration when generating Function.
      */
     public GeneratedProjection(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
index 6e2b799..2746621 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link RecordComparator}. */
 public class GeneratedRecordComparator extends GeneratedClass<RecordComparator> {
@@ -40,7 +41,7 @@ public class GeneratedRecordComparator extends GeneratedClass<RecordComparator>
      * @param conf configuration when generating the generated class.
      */
     public GeneratedRecordComparator(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
index 1f6f020..0a1d8bb 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link RecordEqualiser}. */
 public class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
@@ -40,7 +41,7 @@ public class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
      * @param conf configuration when generating the generated class.
      */
     public GeneratedRecordEqualiser(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
index 3d949da..9cd034f 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 
 /**
@@ -45,7 +46,7 @@ public class GeneratedResultFuture<T extends ResultFuture<?>> extends GeneratedC
      * @param conf configuration when generating ResultFuture.
      */
     public GeneratedResultFuture(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
index 6d95ab7..09073f8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.generated;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link TableAggsHandleFunction}. */
 public class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHandleFunction> {
@@ -26,7 +26,7 @@ public class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHa
     private static final long serialVersionUID = 2L;
 
     public GeneratedTableAggsHandleFunction(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
index e189eae..b5ef4da 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.generated;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 /** Describes a generated {@link WatermarkGenerator}. */
 public class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerator> {
@@ -30,7 +31,7 @@ public class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerat
     }
 
     public GeneratedWatermarkGenerator(
-            String className, String code, Object[] references, Configuration conf) {
+            String className, String code, Object[] references, ReadableConfig conf) {
         super(className, code, references, conf);
     }
 }