You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/24 07:37:00 UTC

[flink] 04/05: [FLINK-26297][table] Use `tableConfig` as name for `TableConfig` variables

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2b743c98d2aabfa028c0cc79a54b62ad30507da
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Feb 22 19:47:13 2022 +0200

    [FLINK-26297][table] Use `tableConfig` as name for `TableConfig` variables
    
    To make clear separation of configurations used, and prepare for future
    effort to use `ReadableConfig` instead of `TableConfig` rename variables
    and function args that require `TableConfig` to `tableConfig` instead of
    plain `config` or `conf`.
    
    Minor code improvements to remove IDE warnings in the touched classes.
---
 .../client/gateway/context/ExecutionContext.java   | 12 ++++-----
 .../api/bridge/java/StreamTableEnvironment.java    |  6 ++---
 .../internal/StreamTableEnvironmentImplTest.java   |  6 ++---
 .../org/apache/flink/table/api/TableConfig.java    | 10 ++++----
 .../flink/table/catalog/FunctionCatalog.java       |  4 +--
 .../expressions/resolver/ExpressionResolver.java   | 23 ++++++++++-------
 .../operations/utils/OperationTreeBuilder.java     | 12 ++++-----
 .../flink/table/utils/TableEnvironmentMock.java    | 10 ++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  4 +--
 .../planner/connectors/DynamicSourceUtils.java     |  7 +++--
 .../logical/PushFilterIntoSourceScanRuleBase.java  |  5 ++--
 .../PushProjectIntoTableSourceScanRule.java        |  4 +--
 .../table/planner/codegen/CalcCodeGenerator.scala  |  4 +--
 .../planner/codegen/CodeGeneratorContext.scala     | 10 +++-----
 .../planner/codegen/CorrelateCodeGenerator.scala   | 12 ++++-----
 .../table/planner/codegen/ExpressionReducer.scala  |  6 ++---
 .../planner/codegen/FunctionCodeGenerator.scala    |  2 +-
 .../planner/codegen/LongHashJoinGenerator.scala    | 17 ++++++------
 .../planner/codegen/LookupJoinCodeGenerator.scala  | 30 +++++++++++-----------
 .../planner/codegen/ValuesCodeGenerator.scala      |  4 +--
 .../codegen/WatermarkGeneratorCodeGenerator.scala  |  6 ++---
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  3 +--
 .../planner/codegen/calls/FunctionGenerator.scala  |  7 ++---
 ...ltiFieldRangeBoundComparatorCodeGenerator.scala |  4 +--
 .../over/RangeBoundComparatorCodeGenerator.scala   |  6 ++---
 .../planner/codegen/sort/SortCodeGenerator.scala   |  8 +++---
 .../table/planner/delegation/BatchPlanner.scala    |  6 ++---
 .../table/planner/delegation/PlannerBase.scala     | 28 ++++++++------------
 .../table/planner/delegation/StreamPlanner.scala   | 13 +++++-----
 .../physical/batch/BatchPhysicalJoinBase.scala     |  4 +--
 .../BatchCommonSubGraphBasedOptimizer.scala        |  3 +--
 .../table/planner/plan/optimize/RelNodeBlock.scala | 18 ++++++-------
 .../StreamCommonSubGraphBasedOptimizer.scala       |  7 +++--
 .../planner/plan/utils/IntervalJoinUtil.scala      | 21 +++++++--------
 .../flink/table/planner/plan/utils/JoinUtil.scala  | 10 ++++----
 .../table/planner/plan/utils/PartitionPruner.scala | 10 ++++----
 .../flink/table/planner/plan/utils/RankUtil.scala  | 22 ++++++++--------
 .../runtime/stream/sql/DataStreamJavaITCase.java   |  8 +++---
 .../planner/runtime/batch/sql/CalcITCase.scala     | 16 ++++++------
 .../planner/runtime/batch/sql/LimitITCase.scala    |  2 +-
 .../runtime/batch/sql/SortLimitITCase.scala        |  2 +-
 .../runtime/batch/sql/join/InnerJoinITCase.scala   |  4 +--
 .../planner/runtime/batch/table/LimitITCase.scala  |  2 +-
 .../planner/runtime/utils/BatchTestBase.scala      | 11 ++++----
 44 files changed, 204 insertions(+), 205 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 49e8853..3805499 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -106,8 +106,8 @@ public class ExecutionContext {
                     "The old planner is not supported anymore. Please update to new default planner.");
         }
 
-        TableConfig config = new TableConfig();
-        config.addConfiguration(flinkConfig);
+        TableConfig tableConfig = new TableConfig();
+        tableConfig.addConfiguration(flinkConfig);
 
         StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
 
@@ -115,7 +115,7 @@ public class ExecutionContext {
         return createStreamTableEnvironment(
                 streamExecEnv,
                 settings,
-                config,
+                tableConfig,
                 executor,
                 sessionState.catalogManager,
                 sessionState.moduleManager,
@@ -126,7 +126,7 @@ public class ExecutionContext {
     private StreamTableEnvironment createStreamTableEnvironment(
             StreamExecutionEnvironment env,
             EnvironmentSettings settings,
-            TableConfig config,
+            TableConfig tableConfig,
             Executor executor,
             CatalogManager catalogManager,
             ModuleManager moduleManager,
@@ -137,7 +137,7 @@ public class ExecutionContext {
                 PlannerFactoryUtil.createPlanner(
                         settings.getPlanner(),
                         executor,
-                        config,
+                        tableConfig,
                         moduleManager,
                         catalogManager,
                         functionCatalog);
@@ -146,7 +146,7 @@ public class ExecutionContext {
                 catalogManager,
                 moduleManager,
                 functionCatalog,
-                config,
+                tableConfig,
                 env,
                 planner,
                 executor,
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index f32172b..6481acf 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -122,9 +122,9 @@ public interface StreamTableEnvironment extends TableEnvironment {
      */
     static StreamTableEnvironment create(
             StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
-        TableConfig config = new TableConfig();
-        config.addConfiguration(settings.toConfiguration());
-        return StreamTableEnvironmentImpl.create(executionEnvironment, settings, config);
+        TableConfig tableConfig = new TableConfig();
+        tableConfig.addConfiguration(settings.toConfiguration());
+        return StreamTableEnvironmentImpl.create(executionEnvironment, settings, tableConfig);
     }
 
     /**
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index d9a9336..2337e1d 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -74,14 +74,14 @@ public class StreamTableEnvironmentImplTest {
 
     private StreamTableEnvironmentImpl getStreamTableEnvironment(
             StreamExecutionEnvironment env, DataStreamSource<Integer> elements) {
-        TableConfig config = new TableConfig();
+        TableConfig tableConfig = new TableConfig();
         CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
         ModuleManager moduleManager = new ModuleManager();
         return new StreamTableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
-                new FunctionCatalog(config, catalogManager, moduleManager),
-                config,
+                new FunctionCatalog(tableConfig, catalogManager, moduleManager),
+                tableConfig,
                 env,
                 new TestPlanner(elements.getTransformation()),
                 new ExecutorMock(),
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 3493b4f..ba0a2ac 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -134,8 +134,8 @@ public class TableConfig {
      *
      * <pre>{@code
      * TableEnvironment tEnv = ...
-     * TableConfig config = tEnv.getConfig
-     * config.setLocalTimeZone(ZoneOffset.ofHours(2));
+     * TableConfig tableConfig = tEnv.getConfig
+     * tableConfig.setLocalTimeZone(ZoneOffset.ofHours(2));
      * tEnv("CREATE TABLE testTable (id BIGINT, tmstmp TIMESTAMP WITH LOCAL TIME ZONE)");
      * tEnv("INSERT INTO testTable VALUES ((1, '2000-01-01 2:00:00'), (2, TIMESTAMP '2000-01-01 2:00:00'))");
      * tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+2
@@ -154,7 +154,7 @@ public class TableConfig {
      * <p>If we change the local time zone and query the same table:
      *
      * <pre>{@code
-     * config.setLocalTimeZone(ZoneOffset.ofHours(0));
+     * tableConfig.setLocalTimeZone(ZoneOffset.ofHours(0));
      * tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+0
      * }</pre>
      *
@@ -352,8 +352,8 @@ public class TableConfig {
      *
      * <pre>{@code
      * Map<String, String> params = ...
-     * TableConfig config = tEnv.getConfig;
-     * config.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
+     * TableConfig tableConfig = tEnv.getConfig;
+     * tableConfig.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
      * }</pre>
      */
     @Experimental
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index f27dd84..afa626b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -77,8 +77,8 @@ public final class FunctionCatalog {
     private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
 
     public FunctionCatalog(
-            TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
-        this(checkNotNull(config).getConfiguration(), catalogManager, moduleManager);
+            TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) {
+        this(checkNotNull(tableConfig).getConfiguration(), catalogManager, moduleManager);
     }
 
     public FunctionCatalog(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 0bb90fc..281866f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -127,7 +127,7 @@ public class ExpressionResolver {
     private final boolean isGroupedAggregation;
 
     private ExpressionResolver(
-            TableConfig config,
+            TableConfig tableConfig,
             TableReferenceLookup tableLookup,
             FunctionLookup functionLookup,
             DataTypeFactory typeFactory,
@@ -137,7 +137,7 @@ public class ExpressionResolver {
             List<LocalReferenceExpression> localReferences,
             @Nullable DataType outputDataType,
             boolean isGroupedAggregation) {
-        this.config = Preconditions.checkNotNull(config).getConfiguration();
+        this.config = Preconditions.checkNotNull(tableConfig).getConfiguration();
         this.tableLookup = Preconditions.checkNotNull(tableLookup);
         this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
         this.functionLookup = Preconditions.checkNotNull(functionLookup);
@@ -165,7 +165,7 @@ public class ExpressionResolver {
      * resolver like e.g. {@link GroupWindow} or {@link OverWindow}. You can also add additional
      * {@link ResolverRule}.
      *
-     * @param config general configuration
+     * @param tableConfig general configuration
      * @param tableCatalog a way to lookup a table reference by name
      * @param functionLookup a way to lookup call by name
      * @param typeFactory a way to lookup and create data types
@@ -173,14 +173,19 @@ public class ExpressionResolver {
      * @return builder for resolver
      */
     public static ExpressionResolverBuilder resolverFor(
-            TableConfig config,
+            TableConfig tableConfig,
             TableReferenceLookup tableCatalog,
             FunctionLookup functionLookup,
             DataTypeFactory typeFactory,
             SqlExpressionResolver sqlExpressionResolver,
             QueryOperation... inputs) {
         return new ExpressionResolverBuilder(
-                inputs, config, tableCatalog, functionLookup, typeFactory, sqlExpressionResolver);
+                inputs,
+                tableConfig,
+                tableCatalog,
+                functionLookup,
+                typeFactory,
+                sqlExpressionResolver);
     }
 
     /**
@@ -420,7 +425,7 @@ public class ExpressionResolver {
     /** Builder for creating {@link ExpressionResolver}. */
     public static class ExpressionResolverBuilder {
 
-        private final TableConfig config;
+        private final TableConfig tableConfig;
         private final List<QueryOperation> queryOperations;
         private final TableReferenceLookup tableCatalog;
         private final FunctionLookup functionLookup;
@@ -433,12 +438,12 @@ public class ExpressionResolver {
 
         private ExpressionResolverBuilder(
                 QueryOperation[] queryOperations,
-                TableConfig config,
+                TableConfig tableConfig,
                 TableReferenceLookup tableCatalog,
                 FunctionLookup functionLookup,
                 DataTypeFactory typeFactory,
                 SqlExpressionResolver sqlExpressionResolver) {
-            this.config = config;
+            this.tableConfig = tableConfig;
             this.queryOperations = Arrays.asList(queryOperations);
             this.tableCatalog = tableCatalog;
             this.functionLookup = functionLookup;
@@ -469,7 +474,7 @@ public class ExpressionResolver {
 
         public ExpressionResolver build() {
             return new ExpressionResolver(
-                    config,
+                    tableConfig,
                     tableCatalog,
                     functionLookup,
                     typeFactory,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index c756029..14dee92 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -82,7 +82,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
 @Internal
 public final class OperationTreeBuilder {
 
-    private final TableConfig config;
+    private final TableConfig tableConfig;
     private final FunctionLookup functionCatalog;
     private final DataTypeFactory typeFactory;
     private final TableReferenceLookup tableReferenceLookup;
@@ -100,7 +100,7 @@ public final class OperationTreeBuilder {
     private final ValuesOperationFactory valuesOperationFactory;
 
     private OperationTreeBuilder(
-            TableConfig config,
+            TableConfig tableConfig,
             FunctionLookup functionLookup,
             DataTypeFactory typeFactory,
             TableReferenceLookup tableReferenceLookup,
@@ -112,7 +112,7 @@ public final class OperationTreeBuilder {
             AggregateOperationFactory aggregateOperationFactory,
             JoinOperationFactory joinOperationFactory,
             ValuesOperationFactory valuesOperationFactory) {
-        this.config = config;
+        this.tableConfig = tableConfig;
         this.functionCatalog = functionLookup;
         this.typeFactory = typeFactory;
         this.tableReferenceLookup = tableReferenceLookup;
@@ -128,14 +128,14 @@ public final class OperationTreeBuilder {
     }
 
     public static OperationTreeBuilder create(
-            TableConfig config,
+            TableConfig tableConfig,
             FunctionLookup functionCatalog,
             DataTypeFactory typeFactory,
             TableReferenceLookup tableReferenceLookup,
             SqlExpressionResolver sqlExpressionResolver,
             boolean isStreamingMode) {
         return new OperationTreeBuilder(
-                config,
+                tableConfig,
                 functionCatalog,
                 typeFactory,
                 tableReferenceLookup,
@@ -390,7 +390,7 @@ public final class OperationTreeBuilder {
     public ExpressionResolver.ExpressionResolverBuilder getResolverBuilder(
             QueryOperation... tableOperation) {
         return ExpressionResolver.resolverFor(
-                config,
+                tableConfig,
                 tableReferenceLookup,
                 functionCatalog,
                 typeFactory,
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index 9020ea5..cb09387 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -69,15 +69,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
     }
 
     private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
-        final TableConfig config = createTableConfig();
+        final TableConfig tableConfig = createTableConfig();
         final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
         final ModuleManager moduleManager = new ModuleManager();
         return new TableEnvironmentMock(
                 catalogManager,
                 moduleManager,
-                config,
+                tableConfig,
                 createExecutor(),
-                createFunctionCatalog(config, catalogManager, moduleManager),
+                createFunctionCatalog(tableConfig, catalogManager, moduleManager),
                 createPlanner(),
                 isStreamingMode);
     }
@@ -91,8 +91,8 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
     }
 
     private static FunctionCatalog createFunctionCatalog(
-            TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
-        return new FunctionCatalog(config, catalogManager, moduleManager);
+            TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) {
+        return new FunctionCatalog(tableConfig, catalogManager, moduleManager);
     }
 
     private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 8bbc63c..f56ba48 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -51,7 +51,7 @@ class StreamTableEnvironmentImpl (
     catalogManager: CatalogManager,
     moduleManager: ModuleManager,
     functionCatalog: FunctionCatalog,
-    config: TableConfig,
+    tableConfig: TableConfig,
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
     executor: Executor,
@@ -60,7 +60,7 @@ class StreamTableEnvironmentImpl (
   extends AbstractStreamTableEnvironmentImpl(
     catalogManager,
     moduleManager,
-    config,
+    tableConfig,
     executor,
     functionCatalog,
     planner,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index c61dc46..b5fcf2d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -227,7 +227,9 @@ public final class DynamicSourceUtils {
 
     /** Returns true if the table source produces duplicate change events. */
     public static boolean isSourceChangeEventsDuplicate(
-            ResolvedSchema resolvedSchema, DynamicTableSource tableSource, TableConfig config) {
+            ResolvedSchema resolvedSchema,
+            DynamicTableSource tableSource,
+            TableConfig tableConfig) {
         if (!(tableSource instanceof ScanTableSource)) {
             return false;
         }
@@ -235,7 +237,8 @@ public final class DynamicSourceUtils {
         boolean isCDCSource =
                 !mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);
         boolean changeEventsDuplicate =
-                config.getConfiguration()
+                tableConfig
+                        .getConfiguration()
                         .getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
         boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
         return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 87fa42c..e3203cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -57,8 +57,9 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule {
 
     @Override
     public boolean matches(RelOptRuleCall call) {
-        TableConfig config = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
-        return config.getConfiguration()
+        TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+        return tableConfig
+                .getConfiguration()
                 .getBoolean(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED);
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index f68866d..a4044bf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -202,13 +202,13 @@ public class PushProjectIntoTableSourceScanRule
         return projections;
     }
 
-    private static boolean requiresPrimaryKey(TableSourceTable table, TableConfig config) {
+    private static boolean requiresPrimaryKey(TableSourceTable table, TableConfig tableConfig) {
         return DynamicSourceUtils.isUpsertSource(
                         table.contextResolvedTable().getResolvedSchema(), table.tableSource())
                 || DynamicSourceUtils.isSourceChangeEventsDuplicate(
                         table.contextResolvedTable().getResolvedSchema(),
                         table.tableSource(),
-                        config);
+                        tableConfig);
     }
 
     private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 606a6bd..2b5e9c5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -74,8 +74,8 @@ object CalcCodeGenerator {
       outRowClass: Class[_ <: RowData],
       calcProjection: Seq[RexNode],
       calcCondition: Option[RexNode],
-      config: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
-    val ctx = CodeGeneratorContext(config)
+      tableConfig: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
+    val ctx = CodeGeneratorContext(tableConfig)
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     val collectorTerm = CodeGenUtils.DEFAULT_COLLECTOR_TERM
     val processCode = generateProcessCode(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index d900490..ab8ba8f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -213,7 +213,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   /**
     * @return Comment to be added as a header comment on the generated class
     */
-  def getClassHeaderComment(): String = {
+  def getClassHeaderComment: String = {
     s"""
     |/*
     | * ${reusableHeaderComments.mkString("\n * ")}
@@ -857,9 +857,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     * @param constant constant expression
     * @return generated expression with the fieldTerm and nullTerm
     */
-  def addReusableConstant(
-      constant: GeneratedExpression,
-      nullCheck: Boolean): GeneratedExpression = {
+  def addReusableConstant(constant: GeneratedExpression): GeneratedExpression = {
     require(constant.literal, "Literal expected")
 
     val fieldTerm = newName("constant")
@@ -977,7 +975,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 }
 
 object CodeGeneratorContext {
-  def apply(config: TableConfig): CodeGeneratorContext = {
-    new CodeGeneratorContext(config)
+  def apply(tableConfig: TableConfig): CodeGeneratorContext = {
+    new CodeGeneratorContext(tableConfig)
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 2c76516..9b3bcc9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -40,7 +40,7 @@ import org.apache.calcite.rex._
 object CorrelateCodeGenerator {
 
   def generateCorrelateTransformation(
-      config: TableConfig,
+      tableConfig: TableConfig,
       operatorCtx: CodeGeneratorContext,
       inputTransformation: Transformation[RowData],
       inputType: RowType,
@@ -74,7 +74,7 @@ object CorrelateCodeGenerator {
 
     val substituteStreamOperator = generateOperator(
       operatorCtx,
-      config,
+      tableConfig,
       inputType,
       condition.map(_.accept(changeInputRefIndexShuttle)),
       outputType,
@@ -97,7 +97,7 @@ object CorrelateCodeGenerator {
     */
   private[flink] def generateOperator[T <: Function](
       ctx: CodeGeneratorContext,
-      config: TableConfig,
+      tableConfig: TableConfig,
       inputType: RowType,
       condition: Option[RexNode],
       returnType: RowType,
@@ -114,7 +114,7 @@ object CorrelateCodeGenerator {
     // 1.1 compile correlate collector
     val correlateCollectorTerm = generateCorrelateCollector(
       ctx,
-      config,
+      tableConfig,
       inputType,
       functionResultType,
       returnType,
@@ -181,7 +181,7 @@ object CorrelateCodeGenerator {
    */
   private def generateCorrelateCollector(
       ctx: CodeGeneratorContext,
-      config: TableConfig,
+      tableConfig: TableConfig,
       inputType: RowType,
       functionResultType: RowType,
       resultType: RowType,
@@ -193,7 +193,7 @@ object CorrelateCodeGenerator {
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     val udtfInputTerm = CodeGenUtils.DEFAULT_INPUT2_TERM
 
-    val collectorCtx = CodeGeneratorContext(config)
+    val collectorCtx = CodeGeneratorContext(tableConfig)
 
     val body = {
       // completely output left input + right
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index be7a953..33fe3e3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -48,7 +48,7 @@ import scala.collection.mutable.ListBuffer
   *                               not null.
   */
 class ExpressionReducer(
-    config: TableConfig,
+    tableConfig: TableConfig,
     allowChangeNullability: Boolean = false)
   extends RexExecutor
   with Logging {
@@ -71,7 +71,7 @@ class ExpressionReducer(
     val resultType = RowType.of(literalTypes: _*)
 
     // generate MapFunction
-    val ctx = new ConstantCodeGeneratorContext(config)
+    val ctx = new ConstantCodeGeneratorContext(tableConfig)
 
     val exprGenerator = new ExprCodeGenerator(ctx, false)
       .bindInput(EMPTY_ROW_TYPE)
@@ -98,7 +98,7 @@ class ExpressionReducer(
         throw new TableException("RichMapFunction[GenericRowData, GenericRowData] required here")
     }
 
-    val parameters = config.getConfiguration
+    val parameters = tableConfig.getConfiguration
     val reduced = try {
       richMapFunction.open(parameters)
       // execute
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 24c286f..4038daa 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -125,7 +125,7 @@ object FunctionCodeGenerator {
 
     val funcCode =
       j"""
-      ${ctx.getClassHeaderComment()}
+      ${ctx.getClassHeaderComment}
       public class $funcName
           extends ${samHeader._1.getCanonicalName} {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index fbbe7df..637260e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -65,7 +65,6 @@ object LongHashJoinGenerator {
   }
 
   private def genGetLongKey(
-      ctx: CodeGeneratorContext,
       keyType: RowType,
       keyMapping: Array[Int],
       rowTerm: String): String = {
@@ -93,10 +92,10 @@ object LongHashJoinGenerator {
      """.stripMargin, anyNullTerm)
   }
 
-  def genProjection(conf: TableConfig, types: Array[LogicalType]): GeneratedProjection = {
+  def genProjection(tableConfig: TableConfig, types: Array[LogicalType]): GeneratedProjection = {
     val rowType = RowType.of(types: _*)
     ProjectionCodeGenerator.generateProjection(
-      CodeGeneratorContext.apply(conf),
+      CodeGeneratorContext.apply(tableConfig),
       "Projection",
       rowType,
       rowType,
@@ -104,7 +103,7 @@ object LongHashJoinGenerator {
   }
 
   def gen(
-      conf: TableConfig,
+      tableConfig: TableConfig,
       hashJoinType: HashJoinType,
       keyType: RowType,
       buildType: RowType,
@@ -120,13 +119,13 @@ object LongHashJoinGenerator {
     val probeSer = new BinaryRowDataSerializer(probeType.getFieldCount)
 
     val tableTerm = newName("LongHashTable")
-    val ctx = CodeGeneratorContext(conf)
+    val ctx = CodeGeneratorContext(tableConfig)
     val buildSerTerm = ctx.addReusableObject(buildSer, "buildSer")
     val probeSerTerm = ctx.addReusableObject(probeSer, "probeSer")
 
-    val bGenProj = genProjection(conf, buildType.getChildren.toArray(Array[LogicalType]()))
+    val bGenProj = genProjection(tableConfig, buildType.getChildren.toArray(Array[LogicalType]()))
     ctx.addReusableInnerClass(bGenProj.getClassName, bGenProj.getCode)
-    val pGenProj = genProjection(conf, probeType.getChildren.toArray(Array[LogicalType]()))
+    val pGenProj = genProjection(tableConfig, probeType.getChildren.toArray(Array[LogicalType]()))
     ctx.addReusableInnerClass(pGenProj.getClassName, pGenProj.getCode)
     ctx.addReusableInnerClass(condFunc.getClassName, condFunc.getCode)
 
@@ -186,12 +185,12 @@ object LongHashJoinGenerator {
          |
          |  @Override
          |  public long getBuildLongKey($ROW_DATA row) {
-         |    ${genGetLongKey(ctx, keyType, buildKeyMapping, "row")}
+         |    ${genGetLongKey(keyType, buildKeyMapping, "row")}
          |  }
          |
          |  @Override
          |  public long getProbeLongKey($ROW_DATA row) {
-         |    ${genGetLongKey(ctx, keyType, probeKeyMapping, "row")}
+         |    ${genGetLongKey(keyType, probeKeyMapping, "row")}
          |  }
          |
          |  @Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index f0615a7..ef7b5b0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -64,7 +64,7 @@ object LookupJoinCodeGenerator {
     * Generates a lookup function ([[TableFunction]])
     */
   def generateSyncLookupFunction(
-      config: TableConfig,
+      tableConfig: TableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -86,7 +86,7 @@ object LookupJoinCodeGenerator {
 
     generateLookupFunction(
       classOf[FlatMapFunction[RowData, RowData]],
-      config,
+      tableConfig,
       dataTypeFactory,
       inputType,
       tableSourceType,
@@ -104,7 +104,7 @@ object LookupJoinCodeGenerator {
     * Generates a async lookup function ([[AsyncTableFunction]])
     */
   def generateAsyncLookupFunction(
-      config: TableConfig,
+      tableConfig: TableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -117,7 +117,7 @@ object LookupJoinCodeGenerator {
 
     generateLookupFunction(
       classOf[AsyncFunction[RowData, AnyRef]],
-      config,
+      tableConfig,
       dataTypeFactory,
       inputType,
       tableSourceType,
@@ -133,7 +133,7 @@ object LookupJoinCodeGenerator {
 
   private def generateLookupFunction[F <: Function](
       generatedClass: Class[F],
-      config: TableConfig,
+      tableConfig: TableConfig,
       dataTypeFactory: DataTypeFactory,
       inputType: LogicalType,
       tableSourceType: LogicalType,
@@ -161,7 +161,7 @@ object LookupJoinCodeGenerator {
       lookupFunction,
       callContext,
       classOf[PlannerBase].getClassLoader,
-      config.getConfiguration)
+      tableConfig.getConfiguration)
 
     val inference = createLookupTypeInference(
       dataTypeFactory,
@@ -170,7 +170,7 @@ object LookupJoinCodeGenerator {
       udf,
       functionName)
 
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
     val operands = prepareOperands(
       ctx,
       inputType,
@@ -415,16 +415,16 @@ object LookupJoinCodeGenerator {
   /**
     * Generates a [[TableFunctionResultFuture]] that can be passed to Java compiler.
     *
-    * @param config The TableConfig
-    * @param name Class name of the table function collector. Must not be unique but has to be a
-    *             valid Java class identifier.
+    * @param tableConfig   The TableConfig
+    * @param name          Class name of the table function collector. Must not be unique but has
+    *   to be a valid Java class identifier.
     * @param leftInputType The type information of the element being collected
     * @param collectedType The type information of the element collected by the collector
-    * @param condition The filter condition before collect elements
+    * @param condition     The filter condition before collect elements
     * @return instance of GeneratedCollector
     */
   def generateTableAsyncCollector(
-      config: TableConfig,
+      tableConfig: TableConfig,
       name: String,
       leftInputType: RowType,
       collectedType: RowType,
@@ -438,7 +438,7 @@ object LookupJoinCodeGenerator {
     val input2Term = DEFAULT_INPUT2_TERM
     val outTerm = "resultCollection"
 
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
 
     val body = if (condition.isEmpty) {
       "getResultFuture().complete(records);"
@@ -508,7 +508,7 @@ object LookupJoinCodeGenerator {
     * to projection/filter the dimension table results
     */
   def generateCalcMapFunction(
-      config: TableConfig,
+      tableConfig: TableConfig,
       projection: Seq[RexNode],
       condition: RexNode,
       outputType: RelDataType,
@@ -521,6 +521,6 @@ object LookupJoinCodeGenerator {
       classOf[GenericRowData],
       projection,
       Option(condition),
-      config)
+      tableConfig)
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
index 7fc68a8..aabb7fd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
@@ -33,11 +33,11 @@ import scala.collection.JavaConversions._
 object ValuesCodeGenerator {
 
   def generatorInputFormat(
-    config: TableConfig,
+    tableConfig: TableConfig,
     outputType: RowType,
     tuples: util.List[util.List[RexLiteral]],
     description: String): ValuesInputFormat = {
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
     val exprGenerator = new ExprCodeGenerator(ctx, false)
     // generate code for every record
     val generatedRecords = tuples.map { r =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index 780a0e0..c1b83c6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -42,7 +42,7 @@ import java.util
 object WatermarkGeneratorCodeGenerator {
 
   def generateWatermarkGenerator(
-      config: TableConfig,
+      tableConfig: TableConfig,
       inputType: RowType,
       watermarkExpr: RexNode,
       contextTerm: Option[String] = None): GeneratedWatermarkGenerator = {
@@ -56,9 +56,9 @@ object WatermarkGeneratorCodeGenerator {
     }
     val funcName = newName("WatermarkGenerator")
     val ctx = if (contextTerm.isDefined) {
-      new WatermarkGeneratorFunctionContext(config, contextTerm.get)
+      new WatermarkGeneratorFunctionContext(tableConfig, contextTerm.get)
     } else {
-      CodeGeneratorContext(config)
+      CodeGeneratorContext(tableConfig)
     }
     val generator = new ExprCodeGenerator(ctx, false)
       .bindInput(inputType, inputTerm = "row")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index f212c4e..bbb7f4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -144,7 +144,7 @@ class AggsHandlerCodeGenerator(
     this.constants = literals
     val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
     val exprs = literals.map(exprGenerator.generateExpression)
-    this.constantExprs = exprs.map(ctx.addReusableConstant(_, nullCheck = true))
+    this.constantExprs = exprs.map(ctx.addReusableConstant)
     this
   }
 
@@ -309,7 +309,6 @@ class AggsHandlerCodeGenerator(
       aggName: String): Option[Expression] = {
 
     if (filterArg > 0) {
-      val name = s"agg_${aggIndex}_filter"
       val filterType = inputFieldTypes(filterArg)
       if (!filterType.isInstanceOf[BooleanType]) {
         throw new TableException(s"filter arg must be boolean, but is $filterType, " +
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 22cb1a9..44c0b28 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
 import java.lang.reflect.Method
 import scala.collection.mutable
 
-class FunctionGenerator private(config: TableConfig) {
+class FunctionGenerator private(tableConfig: TableConfig) {
 
   val INTEGRAL_TYPES = Array(
     TINYINT,
@@ -44,7 +44,7 @@ class FunctionGenerator private(config: TableConfig) {
     mutable.Map()
 
   val isStreamingMode = RuntimeExecutionMode.STREAMING.equals(
-    config.getConfiguration.get(ExecutionOptions.RUNTIME_MODE))
+    tableConfig.getConfiguration.get(ExecutionOptions.RUNTIME_MODE))
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------
@@ -942,5 +942,6 @@ class FunctionGenerator private(config: TableConfig) {
 }
 
 object FunctionGenerator {
-    def getInstance(config: TableConfig): FunctionGenerator = new FunctionGenerator(config)
+    def getInstance(tableConfig: TableConfig): FunctionGenerator =
+      new FunctionGenerator(tableConfig)
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
index 9b101aa..35b31ec 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.types.logical.RowType
   * RANGE allow the compound ORDER BY and the random type when the bound is current row.
   */
 class MultiFieldRangeBoundComparatorCodeGenerator(
-    conf: TableConfig,
+    tableConfig: TableConfig,
     inputType: RowType,
     sortSpec: SortSpec,
     isLowerBound: Boolean = true) {
@@ -45,7 +45,7 @@ class MultiFieldRangeBoundComparatorCodeGenerator(
       if (isLowerBound) s"return $comp >= 0 ? 1 : -1;" else s"return $comp > 0 ? 1 : -1;"
     }
 
-    val ctx = CodeGeneratorContext(conf)
+    val ctx = CodeGeneratorContext(tableConfig)
     val compareCode = GenerateUtils.generateRowCompare(ctx, inputType, sortSpec, input, current)
 
     val code =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
index ff2a468..011f56e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
@@ -46,7 +46,7 @@ import java.math.BigDecimal
   */
 class RangeBoundComparatorCodeGenerator(
     relBuilder: RelBuilder,
-    config: TableConfig,
+    tableConfig: TableConfig,
     inputType: RowType,
     bound: Any,
     key: Int = -1,
@@ -59,7 +59,7 @@ class RangeBoundComparatorCodeGenerator(
     val input = CodeGenUtils.DEFAULT_INPUT1_TERM
     val current = CodeGenUtils.DEFAULT_INPUT2_TERM
 
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
 
     val inputExpr = GenerateUtils.generateFieldAccess(ctx, inputType, inputTerm = input, key)
     val currentExpr = GenerateUtils.generateFieldAccess(ctx, inputType, inputTerm = current, key)
@@ -142,7 +142,7 @@ class RangeBoundComparatorCodeGenerator(
     val relKeyType = typeFactory.createFieldTypeFromLogicalType(realKeyType)
 
     //minus between inputValue and currentValue
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
     val exprCodeGenerator = new ExprCodeGenerator(ctx, false)
     val minusCall = if (keyOrder) {
       relBuilder.call(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index dd5df2c..28e279e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -35,12 +35,12 @@ import scala.collection.mutable
 /**
   * A code generator for generating [[NormalizedKeyComputer]] and [[RecordComparator]].
   *
-  * @param conf         config of the planner.
+  * @param tableConfig  config of the planner.
   * @param input        input type.
   * @param sortSpec     sort specification.
   */
 class SortCodeGenerator(
-    conf: TableConfig,
+    tableConfig: TableConfig,
     val input: RowType,
     val sortSpec: SortSpec) {
 
@@ -183,7 +183,7 @@ class SortCodeGenerator(
       }
     """.stripMargin
 
-    new GeneratedNormalizedKeyComputer(className, code, conf.getConfiguration)
+    new GeneratedNormalizedKeyComputer(className, code, tableConfig.getConfiguration)
   }
 
   def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = {
@@ -383,7 +383,7 @@ class SortCodeGenerator(
     */
   def generateRecordComparator(name: String): GeneratedRecordComparator = {
     ComparatorCodeGenerator.gen(
-        conf,
+        tableConfig,
         name,
         input,
         sortSpec)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 8acfa41..006cb3c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -47,11 +47,11 @@ import scala.collection.JavaConversions._
 
 class BatchPlanner(
     executor: Executor,
-    config: TableConfig,
+    tableConfig: TableConfig,
     moduleManager: ModuleManager,
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager)
-  extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,
+  extends PlannerBase(executor, tableConfig, moduleManager, functionCatalog, catalogManager,
     isStreamingMode = false) {
 
   override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
@@ -132,7 +132,7 @@ class BatchPlanner(
   private def createDummyPlanner(): BatchPlanner = {
     val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
     val executor = new DefaultExecutor(dummyExecEnv)
-    new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager)
+    new BatchPlanner(executor, tableConfig, moduleManager, functionCatalog, catalogManager)
   }
 
   override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index b830141..4442160 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -23,10 +23,8 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.StreamGraph
-import org.apache.flink.table.api.PlanReference.{ContentPlanReference, FilePlanReference, ResourcePlanReference}
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
-import org.apache.flink.table.api.internal.CompiledPlanInternal
 import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -45,10 +43,9 @@ import org.apache.flink.table.planner.delegation.ParserFactory.DefaultParserCont
 import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
-import org.apache.flink.table.planner.plan.ExecNodeGraphCompiledPlan
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
 import org.apache.flink.table.planner.plan.nodes.exec.processor.{ExecNodeGraphProcessor, ProcessorContext}
-import org.apache.flink.table.planner.plan.nodes.exec.serde.{JsonSerdeUtil, SerdeContext}
+import org.apache.flink.table.planner.plan.nodes.exec.serde.SerdeContext
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNodeGraph, ExecNodeGraphGenerator}
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
 import org.apache.flink.table.planner.plan.optimize.Optimizer
@@ -61,8 +58,6 @@ import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toS
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
-
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
@@ -70,7 +65,6 @@ import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.tools.FrameworkConfig
 
-import java.io.{File, IOException}
 import java.lang.{Long => JLong}
 import java.util
 import java.util.{Collections, TimeZone}
@@ -86,7 +80,7 @@ import scala.collection.mutable
   * @param executor        instance of [[Executor]], needed to extract
   *                        [[StreamExecutionEnvironment]] for
   *                        [[org.apache.flink.table.sources.StreamTableSource.getDataStream]]
-  * @param config          mutable configuration passed from corresponding [[TableEnvironment]]
+  * @param tableConfig     mutable configuration passed from corresponding [[TableEnvironment]]
   * @param moduleManager   manager for modules
   * @param functionCatalog catalog of functions
   * @param catalogManager  manager of catalog meta objects such as tables, views, databases etc.
@@ -95,7 +89,7 @@ import scala.collection.mutable
   */
 abstract class PlannerBase(
     executor: Executor,
-    config: TableConfig,
+    tableConfig: TableConfig,
     val moduleManager: ModuleManager,
     val functionCatalog: FunctionCatalog,
     val catalogManager: CatalogManager,
@@ -109,14 +103,14 @@ abstract class PlannerBase(
   private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
 
   private val plannerConfiguration: ReadableConfig = new PlannerConfiguration(
-    config.getConfiguration,
+    tableConfig.getConfiguration,
     executor.getConfiguration)
 
   @VisibleForTesting
   private[flink] val plannerContext: PlannerContext =
     new PlannerContext(
       !isStreamingMode,
-      config,
+      tableConfig,
       moduleManager,
       functionCatalog,
       catalogManager,
@@ -148,7 +142,7 @@ abstract class PlannerBase(
   /** Returns specific query [[Optimizer]] depends on the concrete type of this TableEnvironment. */
   protected def getOptimizer: Optimizer
 
-  def getTableConfig: TableConfig = config
+  def getTableConfig: TableConfig = tableConfig
 
   def getFlinkContext: FlinkContext = plannerContext.getFlinkContext
 
@@ -349,7 +343,7 @@ abstract class PlannerBase(
     val shuttle = new SameRelObjectShuttle()
     val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
     // reuse subplan
-    val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)
+    val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
     // convert FlinkPhysicalRel DAG to ExecNodeGraph
     val generator = new ExecNodeGraphGenerator()
     val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]))
@@ -488,7 +482,7 @@ abstract class PlannerBase(
    * the configuration before planner do optimization with [[ModifyOperation]] or other works.
    */
   protected def validateAndOverrideConfiguration(): Unit = {
-    val configuration = config.getConfiguration
+    val configuration = tableConfig.getConfiguration
     if (!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) {
       throw new IllegalArgumentException(
         "Mismatch between configured planner and actual planner. " +
@@ -502,7 +496,7 @@ abstract class PlannerBase(
     val epochTime :JLong = System.currentTimeMillis()
     configuration.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
     val localTime :JLong =  epochTime +
-      TimeZone.getTimeZone(config.getLocalTimeZone).getOffset(epochTime)
+      TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
     configuration.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
 
     getExecEnv.configure(
@@ -521,7 +515,7 @@ abstract class PlannerBase(
    * Cleanup all internal configuration after plan translation finished.
    */
   protected def cleanupInternalConfigurations(): Unit = {
-    val configuration = config.getConfiguration
+    val configuration = tableConfig.getConfiguration
     configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME)
     configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME)
   }
@@ -563,7 +557,7 @@ abstract class PlannerBase(
     val transformations = translateToPlan(execGraph)
     cleanupInternalConfigurations()
 
-    val streamGraph = executor.createPipeline(transformations, config.getConfiguration, null)
+    val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
       .asInstanceOf[StreamGraph]
 
     (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index cfef7d7..29e2115 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -52,11 +52,11 @@ import _root_.scala.collection.JavaConversions._
 
 class StreamPlanner(
     executor: Executor,
-    config: TableConfig,
+    tableConfig: TableConfig,
     moduleManager: ModuleManager,
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager)
-  extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,
+  extends PlannerBase(executor, tableConfig, moduleManager, functionCatalog, catalogManager,
     isStreamingMode = true) {
 
   override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
@@ -131,7 +131,7 @@ class StreamPlanner(
   private def createDummyPlanner(): StreamPlanner = {
     val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
     val executor = new DefaultExecutor(dummyExecEnv)
-    new StreamPlanner(executor, config, moduleManager, functionCatalog, catalogManager)
+    new StreamPlanner(executor, tableConfig, moduleManager, functionCatalog, catalogManager)
   }
 
   override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
@@ -142,15 +142,14 @@ class StreamPlanner(
         objectReader.readValue(filePlanReference.getFile, classOf[ExecNodeGraph])
       case contentPlanReference: ContentPlanReference =>
         objectReader.readValue(contentPlanReference.getContent, classOf[ExecNodeGraph])
-      case resourcePlanReference: ResourcePlanReference => {
+      case resourcePlanReference: ResourcePlanReference =>
         val url = resourcePlanReference.getClassLoader
           .getResource(resourcePlanReference.getResourcePath)
         if (url == null) {
           throw new IOException(
-            "Cannot load the plan reference from classpath: " + planReference);
+            "Cannot load the plan reference from classpath: " + planReference)
         }
         objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
-      }
       case _ => throw new IllegalStateException(
         "Unknown PlanReference. This is a bug, please contact the developers")
     }
@@ -192,7 +191,7 @@ class StreamPlanner(
     val transformations = translateToPlan(execGraph)
     cleanupInternalConfigurations()
 
-    val streamGraph = executor.createPipeline(transformations, config.getConfiguration, null)
+    val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
       .asInstanceOf[StreamGraph]
 
     val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
index 50da830..37c7201 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
@@ -48,10 +48,10 @@ abstract class BatchPhysicalJoinBase(
   with BatchPhysicalRel {
 
   private[flink] def generateCondition(
-      config: TableConfig,
+      tableConfig: TableConfig,
       leftType: RowType,
       rightType: RowType): GeneratedJoinCondition = {
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
     val exprGenerator = new ExprCodeGenerator(ctx, false)
         .bindInput(leftType)
         .bindSecondInput(rightType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
index 9e35197..5e3c42b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
@@ -30,7 +30,6 @@ import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
 
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexBuilder
 
 import java.util.Collections
 
@@ -42,7 +41,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
 
   override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
     // build RelNodeBlock plan
-    val rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getTableConfig)
+    val rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getConfiguration)
     // optimize recursively RelNodeBlock
     rootBlocks.foreach(optimizeBlock)
     rootBlocks
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
index 6ef66e7..761e7fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
@@ -19,9 +19,8 @@
 package org.apache.flink.table.planner.plan.optimize
 
 import org.apache.flink.annotation.Experimental
-import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.{ConfigOption, ReadableConfig}
 import org.apache.flink.table.planner.plan.`trait`.MiniBatchInterval
 import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
 import org.apache.flink.table.planner.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle}
@@ -30,6 +29,7 @@ import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, ExpandTable
 import org.apache.flink.util.Preconditions
 
 import com.google.common.collect.Sets
+
 import org.apache.calcite.rel._
 import org.apache.calcite.rel.core.{Aggregate, Project, Snapshot, TableFunctionScan, Union}
 import org.apache.calcite.rex.RexNode
@@ -258,13 +258,13 @@ class RelNodeWrapper(relNode: RelNode) {
 /**
   * Builds [[RelNodeBlock]] plan
   */
-class RelNodeBlockPlanBuilder private(config: TableConfig) {
+class RelNodeBlockPlanBuilder private(config: ReadableConfig) {
 
   private val node2Wrapper = new util.IdentityHashMap[RelNode, RelNodeWrapper]()
   private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]()
 
-  private val isUnionAllAsBreakPointEnabled = config.getConfiguration.getBoolean(
-    RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
+  private val isUnionAllAsBreakPointEnabled = config
+    .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
 
   /**
     * Decompose the [[RelNode]] plan into many [[RelNodeBlock]]s,
@@ -415,7 +415,7 @@ object RelNodeBlockPlanBuilder {
     */
   def buildRelNodeBlockPlan(
       sinkNodes: Seq[RelNode],
-      config: TableConfig): Seq[RelNodeBlock] = {
+      config: ReadableConfig): Seq[RelNodeBlock] = {
     require(sinkNodes.nonEmpty)
 
     // expand QueryOperationCatalogViewTable in TableScan
@@ -438,9 +438,9 @@ object RelNodeBlockPlanBuilder {
     * @param relNodes RelNode trees
     * @return RelNode dag which reuse common subPlan in each tree
     */
-  private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: TableConfig): Seq[RelNode] = {
-    val findOpBlockWithDigest = tableConfig.getConfiguration.getBoolean(
-      RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
+  private def reuseRelNodes(relNodes: Seq[RelNode], config: ReadableConfig): Seq[RelNode] = {
+    val findOpBlockWithDigest = config
+      .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
     if (!findOpBlockWithDigest) {
       return relNodes
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index 707f52f..6205d93 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex.RexBuilder
 
 import java.util
 import java.util.Collections
@@ -50,7 +49,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
   extends CommonSubGraphBasedOptimizer {
 
   override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
-    val config = planner.getTableConfig
+    val config = planner.getConfiguration
     // build RelNodeBlock plan
     val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, config)
     // infer trait properties for sink block
@@ -58,9 +57,9 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
       // don't require update before by default
       sinkBlock.setUpdateBeforeRequired(false)
 
-      val miniBatchInterval: MiniBatchInterval = if (config.getConfiguration.getBoolean(
+      val miniBatchInterval: MiniBatchInterval = if (config.get(
         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) {
-        val miniBatchLatency = config.getConfiguration.get(
+        val miniBatchLatency = config.get(
           ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY).toMillis
         Preconditions.checkArgument(miniBatchLatency > 0,
           "MiniBatch Latency must be greater than 0 ms.", null)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index d354b65..dbb60ff 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -70,11 +70,11 @@ object IntervalJoinUtil {
       leftLogicalFieldCnt: Int,
       joinRowType: RelDataType,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
+      tableConfig: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
 
     // Converts the condition to conjunctive normal form (CNF)
     val cnfCondition = FlinkRexUtil.toCnf(rexBuilder,
-      config.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT),
+      tableConfig.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT),
       predicate)
 
     // split the condition into time predicates and other predicates
@@ -119,7 +119,8 @@ object IntervalJoinUtil {
     }
 
     // assemble window bounds from predicates
-    val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
+    val streamTimeOffsets = timePreds.map(
+      computeWindowBoundFromPredicate(_, rexBuilder, tableConfig))
     val (leftLowerBound, leftUpperBound) =
       streamTimeOffsets match {
         case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
@@ -323,7 +324,7 @@ object IntervalJoinUtil {
   private def computeWindowBoundFromPredicate(
       timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): Option[WindowBound] = {
+      tableConfig: TableConfig): Option[WindowBound] = {
 
     val isLeftLowerBound: Boolean =
       timePred.pred.getKind match {
@@ -338,7 +339,7 @@ object IntervalJoinUtil {
       }
 
     // reduce predicate to constants to compute bounds
-    val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+    val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, tableConfig)
 
     if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
       return None
@@ -370,15 +371,15 @@ object IntervalJoinUtil {
     * Replaces the time attributes on both sides of a time predicate by a zero literal and
     * reduces the expressions on both sides to a long literal.
     *
-    * @param timePred   The time predicate which both sides are reduced.
-    * @param rexBuilder A RexBuilder
-    * @param config     A TableConfig.
+    * @param timePred    The time predicate which both sides are reduced.
+    * @param rexBuilder  A RexBuilder
+    * @param tableConfig A TableConfig.
     * @return The values of the reduced literals on both sides of the time comparison predicate.
     */
   private def reduceTimeExpression(
       timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Option[Long], Option[Long]) = {
+      tableConfig: TableConfig): (Option[Long], Option[Long]) = {
 
     /**
      * Checks if the given call is a materialization call for either proctime or rowtime.
@@ -426,7 +427,7 @@ object IntervalJoinUtil {
     val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
 
     // reduce expression to literal
-     val exprReducer = new ExpressionReducer(config, allowChangeNullability = true)
+     val exprReducer = new ExpressionReducer(tableConfig, allowChangeNullability = true)
     val originList = new util.ArrayList[RexNode]()
     originList.add(leftSideWithLiteral)
     originList.add(rightSideWithLiteral)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
index cf9a3a3..dff51b9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
@@ -124,23 +124,23 @@ object JoinUtil {
   }
 
   def generateConditionFunction(
-      config: TableConfig,
+      tableConfig: TableConfig,
       joinSpec: JoinSpec,
       leftType: LogicalType,
       rightType: LogicalType): GeneratedJoinCondition = {
     generateConditionFunction(
-        config,
-        joinSpec.getNonEquiCondition().orElse(null),
+      tableConfig,
+        joinSpec.getNonEquiCondition.orElse(null),
         leftType,
         rightType)
   }
 
   def generateConditionFunction(
-        config: TableConfig,
+        tableConfig: TableConfig,
         nonEquiCondition: RexNode,
         leftType: LogicalType,
         rightType: LogicalType): GeneratedJoinCondition = {
-    val ctx = CodeGeneratorContext(config)
+    val ctx = CodeGeneratorContext(tableConfig)
     // should consider null fields
     val exprGenerator = new ExprCodeGenerator(ctx, false)
         .bindInput(leftType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
index 99a11d8..9f1c47b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
@@ -74,7 +74,7 @@ object PartitionPruner {
     * @return Pruned partitions.
     */
   def prunePartitions(
-      config: TableConfig,
+      tableConfig: TableConfig,
       partitionFieldNames: Array[String],
       partitionFieldTypes: Array[LogicalType],
       allPartitions: JList[JMap[String, String]],
@@ -87,7 +87,7 @@ object PartitionPruner {
     val inputType = InternalTypeInfo.ofFields(partitionFieldTypes, partitionFieldNames).toRowType
     val returnType: LogicalType = new BooleanType(false)
 
-    val ctx = new ConstantCodeGeneratorContext(config)
+    val ctx = new ConstantCodeGeneratorContext(tableConfig)
     val collectorTerm = DEFAULT_COLLECTOR_TERM
 
     val exprGenerator = new ExprCodeGenerator(ctx, false)
@@ -119,8 +119,8 @@ object PartitionPruner {
     val results: JList[Boolean] = new JArrayList[Boolean](allPartitions.size)
     val collector = new ListCollector[Boolean](results)
 
-    val parameters = if (config.getConfiguration != null) {
-      config.getConfiguration
+    val parameters = if (tableConfig.getConfiguration != null) {
+      tableConfig.getConfiguration
     } else {
       new Configuration()
     }
@@ -129,7 +129,7 @@ object PartitionPruner {
       // do filter against all partitions
       allPartitions.foreach { partition =>
         val row = convertPartitionToRow(
-          config.getLocalTimeZone, partitionFieldNames, partitionFieldTypes, partition)
+          tableConfig.getLocalTimeZone, partitionFieldNames, partitionFieldTypes, partition)
         collector.collect(richMapFunction.map(row))
       }
     } finally {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index eec4b9f..ae9e522 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -65,17 +65,17 @@ object RankUtil {
     * @param  oriPred             the original predicate
     * @param  rankFieldIndex      the index of rank field
     * @param  rexBuilder          RexBuilder
-    * @param  config              TableConfig
+    * @param  tableConfig         TableConfig
     * @return A Tuple2 of extracted rank range and remaining predicates.
     */
   def extractRankRange(
       oriPred: RexNode,
       rankFieldIndex: Int,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Option[RankRange], Option[RexNode]) = {
+      tableConfig: TableConfig): (Option[RankRange], Option[RexNode]) = {
     val predicate = FlinkRexUtil.expandSearch(rexBuilder, oriPred)
     // Converts the condition to conjunctive normal form (CNF)
-    val cnfNodeCount = config.getConfiguration.getInteger(
+    val cnfNodeCount = tableConfig.getConfiguration.getInteger(
       FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
     val cnfCondition = FlinkRexUtil.toCnf(rexBuilder, cnfNodeCount, predicate)
 
@@ -105,7 +105,7 @@ object RankUtil {
       return (None, Some(predicate))
     }
 
-    val sortBounds = limitPreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
+    val sortBounds = limitPreds.map(computeWindowBoundFromPredicate(_, rexBuilder, tableConfig))
     val rankRange = sortBounds match {
       case Seq(Some(LowerBoundary(x)), Some(UpperBoundary(y))) =>
         new ConstantRankRange(x, y)
@@ -207,7 +207,7 @@ object RankUtil {
   private def computeWindowBoundFromPredicate(
       limitPred: LimitPredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): Option[Boundary] = {
+      tableConfig: TableConfig): Option[Boundary] = {
 
     val bound: BoundDefine = limitPred.pred.getKind match {
       case SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL if limitPred.rankOnLeftSide =>
@@ -233,7 +233,7 @@ object RankUtil {
       case (_: RexInputRef, Lower) => None
       case _ =>
         // reduce predicate to constants to compute bounds
-        val literal = reduceComparisonPredicate(limitPred, rexBuilder, config)
+        val literal = reduceComparisonPredicate(limitPred, rexBuilder, tableConfig)
         if (literal.isEmpty) {
           None
         } else {
@@ -264,15 +264,15 @@ object RankUtil {
     * Replaces the rank aggregate reference on of a predicate by a zero literal and
     * reduces the expressions on both sides to a long literal.
     *
-    * @param limitPred The limit predicate which both sides are reduced.
-    * @param rexBuilder A RexBuilder
-    * @param config A TableConfig.
+    * @param limitPred   The limit predicate which both sides are reduced.
+    * @param rexBuilder  A RexBuilder
+    * @param tableConfig A TableConfig.
     * @return The values of the reduced literals on both sides of the comparison predicate.
     */
   private def reduceComparisonPredicate(
       limitPred: LimitPredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): Option[Long] = {
+      tableConfig: TableConfig): Option[Long] = {
 
     val expression = if (limitPred.rankOnLeftSide) {
       limitPred.pred.operands.get(1)
@@ -285,7 +285,7 @@ object RankUtil {
     }
 
     // reduce expression to literal
-     val exprReducer = new ExpressionReducer(config)
+     val exprReducer = new ExpressionReducer(tableConfig)
      val originList = new util.ArrayList[RexNode]()
      originList.add(expression)
      val reduceList = new util.ArrayList[RexNode]()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index e8cbdf5..e1bea1a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -516,11 +516,11 @@ public class DataStreamJavaITCase extends AbstractTestBase {
     public void testToDataStreamCustomEventTime() throws Exception {
         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
-        final TableConfig config = tableEnv.getConfig();
+        final TableConfig tableConfig = tableEnv.getConfig();
 
         // session time zone should not have an impact on the conversion
-        final ZoneId originalZone = config.getLocalTimeZone();
-        config.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
+        final ZoneId originalZone = tableConfig.getLocalTimeZone();
+        tableConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
 
         final LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000");
         final LocalDateTime localDateTime2 = LocalDateTime.parse("1970-01-01T01:00:00.000");
@@ -567,7 +567,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                 localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(),
                 localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli());
 
-        config.setLocalTimeZone(originalZone);
+        tableConfig.setLocalTimeZone(originalZone);
     }
 
     @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 5d2e12e..80c4f71 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1312,14 +1312,14 @@ class CalcITCase extends BatchTestBase {
   def testCalcBinary(): Unit = {
     registerCollection(
       "BinaryT",
-      nullData3.map((r) => row(r.getField(0), r.getField(1),
+      nullData3.map(r => row(r.getField(0), r.getField(1),
         r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
     checkResult(
       "select a, b, c from BinaryT where b < 1000",
-      nullData3.map((r) => row(r.getField(0), r.getField(1),
+      nullData3.map(r => row(r.getField(0), r.getField(1),
         r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
     )
   }
@@ -1328,19 +1328,19 @@ class CalcITCase extends BatchTestBase {
   def testOrderByBinary(): Unit = {
     registerCollection(
       "BinaryT",
-      nullData3.map((r) => row(r.getField(0), r.getField(1),
+      nullData3.map(r => row(r.getField(0), r.getField(1),
         r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
       "a, b, c",
       nullablesOfNullData3)
-    conf.getConfiguration.setInteger(
+    tableConfig.getConfiguration.setInteger(
       ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
-    conf.getConfiguration.setBoolean(
+    tableConfig.getConfiguration.setBoolean(
       BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, true)
     checkResult(
       "select * from BinaryT order by c",
       nullData3.sortBy((x : Row) =>
-        x.getField(2).asInstanceOf[String]).map((r) =>
+        x.getField(2).asInstanceOf[String]).map(r =>
         row(r.getField(0), r.getField(1),
           r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
       isSorted = true
@@ -1351,7 +1351,7 @@ class CalcITCase extends BatchTestBase {
   def testGroupByBinary(): Unit = {
     registerCollection(
       "BinaryT2",
-      nullData3.map((r) => row(r.getField(0),
+      nullData3.map(r => row(r.getField(0),
         r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
       new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
       "a, b, c",
@@ -1443,7 +1443,7 @@ class CalcITCase extends BatchTestBase {
 
     val query = """
                   |select * from myTable where f0 in (1.0, 2.0, 3.0)
-                  |""".stripMargin;
+                  |""".stripMargin
 
     checkResult(
       query,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index 8b87824..312a7bc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 
 class LimitITCase extends LegacyLimitITCase {
   override def before(): Unit = {
-    BatchTestBase.configForMiniCluster(conf)
+    BatchTestBase.configForMiniCluster(tableConfig)
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
 
     val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
index ff8e3ad..5817fdb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
@@ -109,7 +109,7 @@ class SortLimitITCase extends BatchTestBase {
 
   @Test
   def testOrderBehindField(): Unit = {
-    conf.getConfiguration.setInteger(
+    tableConfig.getConfiguration.setInteger(
       ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
     val expected = data3.sortBy((x : Row) => x.getField(2).asInstanceOf[String])
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
index f224425..b16d4e7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -154,7 +154,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
   @Test
   def testBigForSpill(): Unit = {
 
-    conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+    tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
 
     val bigData = Random.shuffle(
       bigIntStringData.union(bigIntStringData).union(bigIntStringData).union(bigIntStringData))
@@ -169,7 +169,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
   @Test
   def testSortMergeJoinOutputOrder(): Unit = {
     if (expectedJoinType == SortMergeJoin) {
-      conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+      tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
       env.getConfig.setParallelism(1)
 
       val bigData = Random.shuffle(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
index ccda349..0ace286 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
@@ -26,7 +26,7 @@ class LimitITCase extends LegacyLimitITCase {
 
   @Before
   override def before(): Unit = {
-    BatchTestBase.configForMiniCluster(conf)
+    BatchTestBase.configForMiniCluster(tableConfig)
 
     val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
     val ddl =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 11f746b..3146565 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -67,7 +67,7 @@ class BatchTestBase extends BatchAbstractTestBase {
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val env: StreamExecutionEnvironment = planner.getExecEnv
   env.getConfig.enableObjectReuse()
-  val conf: TableConfig = tEnv.getConfig
+  val tableConfig: TableConfig = tEnv.getConfig
 
   val LINE_COL_PATTERN: Pattern = Pattern.compile("At line ([0-9]+), column ([0-9]+)")
   val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
@@ -75,7 +75,7 @@ class BatchTestBase extends BatchAbstractTestBase {
 
   @Before
   def before(): Unit = {
-    BatchTestBase.configForMiniCluster(conf)
+    BatchTestBase.configForMiniCluster(tableConfig)
   }
 
   @After
@@ -517,9 +517,10 @@ object BatchTestBase {
     }
   }
 
-  def configForMiniCluster(conf: TableConfig): Unit = {
-    conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
-    conf.getConfiguration
+  def configForMiniCluster(tableConfig: TableConfig): Unit = {
+    tableConfig.
+      getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
+    tableConfig.getConfiguration
       .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
   }
 }