You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/24 07:37:00 UTC
[flink] 04/05: [FLINK-26297][table] Use `tableConfig` as name for `TableConfig` variables
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e2b743c98d2aabfa028c0cc79a54b62ad30507da
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Feb 22 19:47:13 2022 +0200
[FLINK-26297][table] Use `tableConfig` as name for `TableConfig` variables
To make clear separation of configurations used, and prepare for future
effort to use `ReadableConfig` instead of `TableConfig` rename variables
and function args that require `TableConfig` to `tableConfig` instead of
plain `config` or `conf`.
Minor code improvements to remove IDE warnings in the touched classes.
---
.../client/gateway/context/ExecutionContext.java | 12 ++++-----
.../api/bridge/java/StreamTableEnvironment.java | 6 ++---
.../internal/StreamTableEnvironmentImplTest.java | 6 ++---
.../org/apache/flink/table/api/TableConfig.java | 10 ++++----
.../flink/table/catalog/FunctionCatalog.java | 4 +--
.../expressions/resolver/ExpressionResolver.java | 23 ++++++++++-------
.../operations/utils/OperationTreeBuilder.java | 12 ++++-----
.../flink/table/utils/TableEnvironmentMock.java | 10 ++++----
.../internal/StreamTableEnvironmentImpl.scala | 4 +--
.../planner/connectors/DynamicSourceUtils.java | 7 +++--
.../logical/PushFilterIntoSourceScanRuleBase.java | 5 ++--
.../PushProjectIntoTableSourceScanRule.java | 4 +--
.../table/planner/codegen/CalcCodeGenerator.scala | 4 +--
.../planner/codegen/CodeGeneratorContext.scala | 10 +++-----
.../planner/codegen/CorrelateCodeGenerator.scala | 12 ++++-----
.../table/planner/codegen/ExpressionReducer.scala | 6 ++---
.../planner/codegen/FunctionCodeGenerator.scala | 2 +-
.../planner/codegen/LongHashJoinGenerator.scala | 17 ++++++------
.../planner/codegen/LookupJoinCodeGenerator.scala | 30 +++++++++++-----------
.../planner/codegen/ValuesCodeGenerator.scala | 4 +--
.../codegen/WatermarkGeneratorCodeGenerator.scala | 6 ++---
.../codegen/agg/AggsHandlerCodeGenerator.scala | 3 +--
.../planner/codegen/calls/FunctionGenerator.scala | 7 ++---
...ltiFieldRangeBoundComparatorCodeGenerator.scala | 4 +--
.../over/RangeBoundComparatorCodeGenerator.scala | 6 ++---
.../planner/codegen/sort/SortCodeGenerator.scala | 8 +++---
.../table/planner/delegation/BatchPlanner.scala | 6 ++---
.../table/planner/delegation/PlannerBase.scala | 28 ++++++++------------
.../table/planner/delegation/StreamPlanner.scala | 13 +++++-----
.../physical/batch/BatchPhysicalJoinBase.scala | 4 +--
.../BatchCommonSubGraphBasedOptimizer.scala | 3 +--
.../table/planner/plan/optimize/RelNodeBlock.scala | 18 ++++++-------
.../StreamCommonSubGraphBasedOptimizer.scala | 7 +++--
.../planner/plan/utils/IntervalJoinUtil.scala | 21 +++++++--------
.../flink/table/planner/plan/utils/JoinUtil.scala | 10 ++++----
.../table/planner/plan/utils/PartitionPruner.scala | 10 ++++----
.../flink/table/planner/plan/utils/RankUtil.scala | 22 ++++++++--------
.../runtime/stream/sql/DataStreamJavaITCase.java | 8 +++---
.../planner/runtime/batch/sql/CalcITCase.scala | 16 ++++++------
.../planner/runtime/batch/sql/LimitITCase.scala | 2 +-
.../runtime/batch/sql/SortLimitITCase.scala | 2 +-
.../runtime/batch/sql/join/InnerJoinITCase.scala | 4 +--
.../planner/runtime/batch/table/LimitITCase.scala | 2 +-
.../planner/runtime/utils/BatchTestBase.scala | 11 ++++----
44 files changed, 204 insertions(+), 205 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 49e8853..3805499 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -106,8 +106,8 @@ public class ExecutionContext {
"The old planner is not supported anymore. Please update to new default planner.");
}
- TableConfig config = new TableConfig();
- config.addConfiguration(flinkConfig);
+ TableConfig tableConfig = new TableConfig();
+ tableConfig.addConfiguration(flinkConfig);
StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();
@@ -115,7 +115,7 @@ public class ExecutionContext {
return createStreamTableEnvironment(
streamExecEnv,
settings,
- config,
+ tableConfig,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
@@ -126,7 +126,7 @@ public class ExecutionContext {
private StreamTableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
- TableConfig config,
+ TableConfig tableConfig,
Executor executor,
CatalogManager catalogManager,
ModuleManager moduleManager,
@@ -137,7 +137,7 @@ public class ExecutionContext {
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
- config,
+ tableConfig,
moduleManager,
catalogManager,
functionCatalog);
@@ -146,7 +146,7 @@ public class ExecutionContext {
catalogManager,
moduleManager,
functionCatalog,
- config,
+ tableConfig,
env,
planner,
executor,
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index f32172b..6481acf 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -122,9 +122,9 @@ public interface StreamTableEnvironment extends TableEnvironment {
*/
static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
- TableConfig config = new TableConfig();
- config.addConfiguration(settings.toConfiguration());
- return StreamTableEnvironmentImpl.create(executionEnvironment, settings, config);
+ TableConfig tableConfig = new TableConfig();
+ tableConfig.addConfiguration(settings.toConfiguration());
+ return StreamTableEnvironmentImpl.create(executionEnvironment, settings, tableConfig);
}
/**
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index d9a9336..2337e1d 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -74,14 +74,14 @@ public class StreamTableEnvironmentImplTest {
private StreamTableEnvironmentImpl getStreamTableEnvironment(
StreamExecutionEnvironment env, DataStreamSource<Integer> elements) {
- TableConfig config = new TableConfig();
+ TableConfig tableConfig = new TableConfig();
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
- new FunctionCatalog(config, catalogManager, moduleManager),
- config,
+ new FunctionCatalog(tableConfig, catalogManager, moduleManager),
+ tableConfig,
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 3493b4f..ba0a2ac 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -134,8 +134,8 @@ public class TableConfig {
*
* <pre>{@code
* TableEnvironment tEnv = ...
- * TableConfig config = tEnv.getConfig
- * config.setLocalTimeZone(ZoneOffset.ofHours(2));
+ * TableConfig tableConfig = tEnv.getConfig
+ * tableConfig.setLocalTimeZone(ZoneOffset.ofHours(2));
* tEnv("CREATE TABLE testTable (id BIGINT, tmstmp TIMESTAMP WITH LOCAL TIME ZONE)");
* tEnv("INSERT INTO testTable VALUES ((1, '2000-01-01 2:00:00'), (2, TIMESTAMP '2000-01-01 2:00:00'))");
* tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+2
@@ -154,7 +154,7 @@ public class TableConfig {
* <p>If we change the local time zone and query the same table:
*
* <pre>{@code
- * config.setLocalTimeZone(ZoneOffset.ofHours(0));
+ * tableConfig.setLocalTimeZone(ZoneOffset.ofHours(0));
* tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+0
* }</pre>
*
@@ -352,8 +352,8 @@ public class TableConfig {
*
* <pre>{@code
* Map<String, String> params = ...
- * TableConfig config = tEnv.getConfig;
- * config.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
+ * TableConfig tableConfig = tEnv.getConfig;
+ * tableConfig.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
* }</pre>
*/
@Experimental
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index f27dd84..afa626b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -77,8 +77,8 @@ public final class FunctionCatalog {
private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
public FunctionCatalog(
- TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
- this(checkNotNull(config).getConfiguration(), catalogManager, moduleManager);
+ TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) {
+ this(checkNotNull(tableConfig).getConfiguration(), catalogManager, moduleManager);
}
public FunctionCatalog(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 0bb90fc..281866f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -127,7 +127,7 @@ public class ExpressionResolver {
private final boolean isGroupedAggregation;
private ExpressionResolver(
- TableConfig config,
+ TableConfig tableConfig,
TableReferenceLookup tableLookup,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
@@ -137,7 +137,7 @@ public class ExpressionResolver {
List<LocalReferenceExpression> localReferences,
@Nullable DataType outputDataType,
boolean isGroupedAggregation) {
- this.config = Preconditions.checkNotNull(config).getConfiguration();
+ this.config = Preconditions.checkNotNull(tableConfig).getConfiguration();
this.tableLookup = Preconditions.checkNotNull(tableLookup);
this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
this.functionLookup = Preconditions.checkNotNull(functionLookup);
@@ -165,7 +165,7 @@ public class ExpressionResolver {
* resolver like e.g. {@link GroupWindow} or {@link OverWindow}. You can also add additional
* {@link ResolverRule}.
*
- * @param config general configuration
+ * @param tableConfig general configuration
* @param tableCatalog a way to lookup a table reference by name
* @param functionLookup a way to lookup call by name
* @param typeFactory a way to lookup and create data types
@@ -173,14 +173,19 @@ public class ExpressionResolver {
* @return builder for resolver
*/
public static ExpressionResolverBuilder resolverFor(
- TableConfig config,
+ TableConfig tableConfig,
TableReferenceLookup tableCatalog,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
SqlExpressionResolver sqlExpressionResolver,
QueryOperation... inputs) {
return new ExpressionResolverBuilder(
- inputs, config, tableCatalog, functionLookup, typeFactory, sqlExpressionResolver);
+ inputs,
+ tableConfig,
+ tableCatalog,
+ functionLookup,
+ typeFactory,
+ sqlExpressionResolver);
}
/**
@@ -420,7 +425,7 @@ public class ExpressionResolver {
/** Builder for creating {@link ExpressionResolver}. */
public static class ExpressionResolverBuilder {
- private final TableConfig config;
+ private final TableConfig tableConfig;
private final List<QueryOperation> queryOperations;
private final TableReferenceLookup tableCatalog;
private final FunctionLookup functionLookup;
@@ -433,12 +438,12 @@ public class ExpressionResolver {
private ExpressionResolverBuilder(
QueryOperation[] queryOperations,
- TableConfig config,
+ TableConfig tableConfig,
TableReferenceLookup tableCatalog,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
SqlExpressionResolver sqlExpressionResolver) {
- this.config = config;
+ this.tableConfig = tableConfig;
this.queryOperations = Arrays.asList(queryOperations);
this.tableCatalog = tableCatalog;
this.functionLookup = functionLookup;
@@ -469,7 +474,7 @@ public class ExpressionResolver {
public ExpressionResolver build() {
return new ExpressionResolver(
- config,
+ tableConfig,
tableCatalog,
functionLookup,
typeFactory,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index c756029..14dee92 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -82,7 +82,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
@Internal
public final class OperationTreeBuilder {
- private final TableConfig config;
+ private final TableConfig tableConfig;
private final FunctionLookup functionCatalog;
private final DataTypeFactory typeFactory;
private final TableReferenceLookup tableReferenceLookup;
@@ -100,7 +100,7 @@ public final class OperationTreeBuilder {
private final ValuesOperationFactory valuesOperationFactory;
private OperationTreeBuilder(
- TableConfig config,
+ TableConfig tableConfig,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
TableReferenceLookup tableReferenceLookup,
@@ -112,7 +112,7 @@ public final class OperationTreeBuilder {
AggregateOperationFactory aggregateOperationFactory,
JoinOperationFactory joinOperationFactory,
ValuesOperationFactory valuesOperationFactory) {
- this.config = config;
+ this.tableConfig = tableConfig;
this.functionCatalog = functionLookup;
this.typeFactory = typeFactory;
this.tableReferenceLookup = tableReferenceLookup;
@@ -128,14 +128,14 @@ public final class OperationTreeBuilder {
}
public static OperationTreeBuilder create(
- TableConfig config,
+ TableConfig tableConfig,
FunctionLookup functionCatalog,
DataTypeFactory typeFactory,
TableReferenceLookup tableReferenceLookup,
SqlExpressionResolver sqlExpressionResolver,
boolean isStreamingMode) {
return new OperationTreeBuilder(
- config,
+ tableConfig,
functionCatalog,
typeFactory,
tableReferenceLookup,
@@ -390,7 +390,7 @@ public final class OperationTreeBuilder {
public ExpressionResolver.ExpressionResolverBuilder getResolverBuilder(
QueryOperation... tableOperation) {
return ExpressionResolver.resolverFor(
- config,
+ tableConfig,
tableReferenceLookup,
functionCatalog,
typeFactory,
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index 9020ea5..cb09387 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -69,15 +69,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
}
private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
- final TableConfig config = createTableConfig();
+ final TableConfig tableConfig = createTableConfig();
final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
final ModuleManager moduleManager = new ModuleManager();
return new TableEnvironmentMock(
catalogManager,
moduleManager,
- config,
+ tableConfig,
createExecutor(),
- createFunctionCatalog(config, catalogManager, moduleManager),
+ createFunctionCatalog(tableConfig, catalogManager, moduleManager),
createPlanner(),
isStreamingMode);
}
@@ -91,8 +91,8 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
}
private static FunctionCatalog createFunctionCatalog(
- TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
- return new FunctionCatalog(config, catalogManager, moduleManager);
+ TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) {
+ return new FunctionCatalog(tableConfig, catalogManager, moduleManager);
}
private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 8bbc63c..f56ba48 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -51,7 +51,7 @@ class StreamTableEnvironmentImpl (
catalogManager: CatalogManager,
moduleManager: ModuleManager,
functionCatalog: FunctionCatalog,
- config: TableConfig,
+ tableConfig: TableConfig,
scalaExecutionEnvironment: StreamExecutionEnvironment,
planner: Planner,
executor: Executor,
@@ -60,7 +60,7 @@ class StreamTableEnvironmentImpl (
extends AbstractStreamTableEnvironmentImpl(
catalogManager,
moduleManager,
- config,
+ tableConfig,
executor,
functionCatalog,
planner,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index c61dc46..b5fcf2d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -227,7 +227,9 @@ public final class DynamicSourceUtils {
/** Returns true if the table source produces duplicate change events. */
public static boolean isSourceChangeEventsDuplicate(
- ResolvedSchema resolvedSchema, DynamicTableSource tableSource, TableConfig config) {
+ ResolvedSchema resolvedSchema,
+ DynamicTableSource tableSource,
+ TableConfig tableConfig) {
if (!(tableSource instanceof ScanTableSource)) {
return false;
}
@@ -235,7 +237,8 @@ public final class DynamicSourceUtils {
boolean isCDCSource =
!mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);
boolean changeEventsDuplicate =
- config.getConfiguration()
+ tableConfig
+ .getConfiguration()
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 87fa42c..e3203cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -57,8 +57,9 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule {
@Override
public boolean matches(RelOptRuleCall call) {
- TableConfig config = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
- return config.getConfiguration()
+ TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+ return tableConfig
+ .getConfiguration()
.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index f68866d..a4044bf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -202,13 +202,13 @@ public class PushProjectIntoTableSourceScanRule
return projections;
}
- private static boolean requiresPrimaryKey(TableSourceTable table, TableConfig config) {
+ private static boolean requiresPrimaryKey(TableSourceTable table, TableConfig tableConfig) {
return DynamicSourceUtils.isUpsertSource(
table.contextResolvedTable().getResolvedSchema(), table.tableSource())
|| DynamicSourceUtils.isSourceChangeEventsDuplicate(
table.contextResolvedTable().getResolvedSchema(),
table.tableSource(),
- config);
+ tableConfig);
}
private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 606a6bd..2b5e9c5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -74,8 +74,8 @@ object CalcCodeGenerator {
outRowClass: Class[_ <: RowData],
calcProjection: Seq[RexNode],
calcCondition: Option[RexNode],
- config: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
- val ctx = CodeGeneratorContext(config)
+ tableConfig: TableConfig): GeneratedFunction[FlatMapFunction[RowData, RowData]] = {
+ val ctx = CodeGeneratorContext(tableConfig)
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
val collectorTerm = CodeGenUtils.DEFAULT_COLLECTOR_TERM
val processCode = generateProcessCode(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index d900490..ab8ba8f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -213,7 +213,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
/**
* @return Comment to be added as a header comment on the generated class
*/
- def getClassHeaderComment(): String = {
+ def getClassHeaderComment: String = {
s"""
|/*
| * ${reusableHeaderComments.mkString("\n * ")}
@@ -857,9 +857,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* @param constant constant expression
* @return generated expression with the fieldTerm and nullTerm
*/
- def addReusableConstant(
- constant: GeneratedExpression,
- nullCheck: Boolean): GeneratedExpression = {
+ def addReusableConstant(constant: GeneratedExpression): GeneratedExpression = {
require(constant.literal, "Literal expected")
val fieldTerm = newName("constant")
@@ -977,7 +975,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
object CodeGeneratorContext {
- def apply(config: TableConfig): CodeGeneratorContext = {
- new CodeGeneratorContext(config)
+ def apply(tableConfig: TableConfig): CodeGeneratorContext = {
+ new CodeGeneratorContext(tableConfig)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 2c76516..9b3bcc9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -40,7 +40,7 @@ import org.apache.calcite.rex._
object CorrelateCodeGenerator {
def generateCorrelateTransformation(
- config: TableConfig,
+ tableConfig: TableConfig,
operatorCtx: CodeGeneratorContext,
inputTransformation: Transformation[RowData],
inputType: RowType,
@@ -74,7 +74,7 @@ object CorrelateCodeGenerator {
val substituteStreamOperator = generateOperator(
operatorCtx,
- config,
+ tableConfig,
inputType,
condition.map(_.accept(changeInputRefIndexShuttle)),
outputType,
@@ -97,7 +97,7 @@ object CorrelateCodeGenerator {
*/
private[flink] def generateOperator[T <: Function](
ctx: CodeGeneratorContext,
- config: TableConfig,
+ tableConfig: TableConfig,
inputType: RowType,
condition: Option[RexNode],
returnType: RowType,
@@ -114,7 +114,7 @@ object CorrelateCodeGenerator {
// 1.1 compile correlate collector
val correlateCollectorTerm = generateCorrelateCollector(
ctx,
- config,
+ tableConfig,
inputType,
functionResultType,
returnType,
@@ -181,7 +181,7 @@ object CorrelateCodeGenerator {
*/
private def generateCorrelateCollector(
ctx: CodeGeneratorContext,
- config: TableConfig,
+ tableConfig: TableConfig,
inputType: RowType,
functionResultType: RowType,
resultType: RowType,
@@ -193,7 +193,7 @@ object CorrelateCodeGenerator {
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
val udtfInputTerm = CodeGenUtils.DEFAULT_INPUT2_TERM
- val collectorCtx = CodeGeneratorContext(config)
+ val collectorCtx = CodeGeneratorContext(tableConfig)
val body = {
// completely output left input + right
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index be7a953..33fe3e3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -48,7 +48,7 @@ import scala.collection.mutable.ListBuffer
* not null.
*/
class ExpressionReducer(
- config: TableConfig,
+ tableConfig: TableConfig,
allowChangeNullability: Boolean = false)
extends RexExecutor
with Logging {
@@ -71,7 +71,7 @@ class ExpressionReducer(
val resultType = RowType.of(literalTypes: _*)
// generate MapFunction
- val ctx = new ConstantCodeGeneratorContext(config)
+ val ctx = new ConstantCodeGeneratorContext(tableConfig)
val exprGenerator = new ExprCodeGenerator(ctx, false)
.bindInput(EMPTY_ROW_TYPE)
@@ -98,7 +98,7 @@ class ExpressionReducer(
throw new TableException("RichMapFunction[GenericRowData, GenericRowData] required here")
}
- val parameters = config.getConfiguration
+ val parameters = tableConfig.getConfiguration
val reduced = try {
richMapFunction.open(parameters)
// execute
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 24c286f..4038daa 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -125,7 +125,7 @@ object FunctionCodeGenerator {
val funcCode =
j"""
- ${ctx.getClassHeaderComment()}
+ ${ctx.getClassHeaderComment}
public class $funcName
extends ${samHeader._1.getCanonicalName} {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index fbbe7df..637260e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -65,7 +65,6 @@ object LongHashJoinGenerator {
}
private def genGetLongKey(
- ctx: CodeGeneratorContext,
keyType: RowType,
keyMapping: Array[Int],
rowTerm: String): String = {
@@ -93,10 +92,10 @@ object LongHashJoinGenerator {
""".stripMargin, anyNullTerm)
}
- def genProjection(conf: TableConfig, types: Array[LogicalType]): GeneratedProjection = {
+ def genProjection(tableConfig: TableConfig, types: Array[LogicalType]): GeneratedProjection = {
val rowType = RowType.of(types: _*)
ProjectionCodeGenerator.generateProjection(
- CodeGeneratorContext.apply(conf),
+ CodeGeneratorContext.apply(tableConfig),
"Projection",
rowType,
rowType,
@@ -104,7 +103,7 @@ object LongHashJoinGenerator {
}
def gen(
- conf: TableConfig,
+ tableConfig: TableConfig,
hashJoinType: HashJoinType,
keyType: RowType,
buildType: RowType,
@@ -120,13 +119,13 @@ object LongHashJoinGenerator {
val probeSer = new BinaryRowDataSerializer(probeType.getFieldCount)
val tableTerm = newName("LongHashTable")
- val ctx = CodeGeneratorContext(conf)
+ val ctx = CodeGeneratorContext(tableConfig)
val buildSerTerm = ctx.addReusableObject(buildSer, "buildSer")
val probeSerTerm = ctx.addReusableObject(probeSer, "probeSer")
- val bGenProj = genProjection(conf, buildType.getChildren.toArray(Array[LogicalType]()))
+ val bGenProj = genProjection(tableConfig, buildType.getChildren.toArray(Array[LogicalType]()))
ctx.addReusableInnerClass(bGenProj.getClassName, bGenProj.getCode)
- val pGenProj = genProjection(conf, probeType.getChildren.toArray(Array[LogicalType]()))
+ val pGenProj = genProjection(tableConfig, probeType.getChildren.toArray(Array[LogicalType]()))
ctx.addReusableInnerClass(pGenProj.getClassName, pGenProj.getCode)
ctx.addReusableInnerClass(condFunc.getClassName, condFunc.getCode)
@@ -186,12 +185,12 @@ object LongHashJoinGenerator {
|
| @Override
| public long getBuildLongKey($ROW_DATA row) {
- | ${genGetLongKey(ctx, keyType, buildKeyMapping, "row")}
+ | ${genGetLongKey(keyType, buildKeyMapping, "row")}
| }
|
| @Override
| public long getProbeLongKey($ROW_DATA row) {
- | ${genGetLongKey(ctx, keyType, probeKeyMapping, "row")}
+ | ${genGetLongKey(keyType, probeKeyMapping, "row")}
| }
|
| @Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index f0615a7..ef7b5b0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -64,7 +64,7 @@ object LookupJoinCodeGenerator {
* Generates a lookup function ([[TableFunction]])
*/
def generateSyncLookupFunction(
- config: TableConfig,
+ tableConfig: TableConfig,
dataTypeFactory: DataTypeFactory,
inputType: LogicalType,
tableSourceType: LogicalType,
@@ -86,7 +86,7 @@ object LookupJoinCodeGenerator {
generateLookupFunction(
classOf[FlatMapFunction[RowData, RowData]],
- config,
+ tableConfig,
dataTypeFactory,
inputType,
tableSourceType,
@@ -104,7 +104,7 @@ object LookupJoinCodeGenerator {
* Generates a async lookup function ([[AsyncTableFunction]])
*/
def generateAsyncLookupFunction(
- config: TableConfig,
+ tableConfig: TableConfig,
dataTypeFactory: DataTypeFactory,
inputType: LogicalType,
tableSourceType: LogicalType,
@@ -117,7 +117,7 @@ object LookupJoinCodeGenerator {
generateLookupFunction(
classOf[AsyncFunction[RowData, AnyRef]],
- config,
+ tableConfig,
dataTypeFactory,
inputType,
tableSourceType,
@@ -133,7 +133,7 @@ object LookupJoinCodeGenerator {
private def generateLookupFunction[F <: Function](
generatedClass: Class[F],
- config: TableConfig,
+ tableConfig: TableConfig,
dataTypeFactory: DataTypeFactory,
inputType: LogicalType,
tableSourceType: LogicalType,
@@ -161,7 +161,7 @@ object LookupJoinCodeGenerator {
lookupFunction,
callContext,
classOf[PlannerBase].getClassLoader,
- config.getConfiguration)
+ tableConfig.getConfiguration)
val inference = createLookupTypeInference(
dataTypeFactory,
@@ -170,7 +170,7 @@ object LookupJoinCodeGenerator {
udf,
functionName)
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val operands = prepareOperands(
ctx,
inputType,
@@ -415,16 +415,16 @@ object LookupJoinCodeGenerator {
/**
* Generates a [[TableFunctionResultFuture]] that can be passed to Java compiler.
*
- * @param config The TableConfig
- * @param name Class name of the table function collector. Must not be unique but has to be a
- * valid Java class identifier.
+ * @param tableConfig The TableConfig
+ * @param name Class name of the table function collector. Must not be unique but has
+ * to be a valid Java class identifier.
* @param leftInputType The type information of the element being collected
* @param collectedType The type information of the element collected by the collector
- * @param condition The filter condition before collect elements
+ * @param condition The filter condition before collect elements
* @return instance of GeneratedCollector
*/
def generateTableAsyncCollector(
- config: TableConfig,
+ tableConfig: TableConfig,
name: String,
leftInputType: RowType,
collectedType: RowType,
@@ -438,7 +438,7 @@ object LookupJoinCodeGenerator {
val input2Term = DEFAULT_INPUT2_TERM
val outTerm = "resultCollection"
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val body = if (condition.isEmpty) {
"getResultFuture().complete(records);"
@@ -508,7 +508,7 @@ object LookupJoinCodeGenerator {
* to projection/filter the dimension table results
*/
def generateCalcMapFunction(
- config: TableConfig,
+ tableConfig: TableConfig,
projection: Seq[RexNode],
condition: RexNode,
outputType: RelDataType,
@@ -521,6 +521,6 @@ object LookupJoinCodeGenerator {
classOf[GenericRowData],
projection,
Option(condition),
- config)
+ tableConfig)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
index 7fc68a8..aabb7fd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ValuesCodeGenerator.scala
@@ -33,11 +33,11 @@ import scala.collection.JavaConversions._
object ValuesCodeGenerator {
def generatorInputFormat(
- config: TableConfig,
+ tableConfig: TableConfig,
outputType: RowType,
tuples: util.List[util.List[RexLiteral]],
description: String): ValuesInputFormat = {
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val exprGenerator = new ExprCodeGenerator(ctx, false)
// generate code for every record
val generatedRecords = tuples.map { r =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index 780a0e0..c1b83c6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -42,7 +42,7 @@ import java.util
object WatermarkGeneratorCodeGenerator {
def generateWatermarkGenerator(
- config: TableConfig,
+ tableConfig: TableConfig,
inputType: RowType,
watermarkExpr: RexNode,
contextTerm: Option[String] = None): GeneratedWatermarkGenerator = {
@@ -56,9 +56,9 @@ object WatermarkGeneratorCodeGenerator {
}
val funcName = newName("WatermarkGenerator")
val ctx = if (contextTerm.isDefined) {
- new WatermarkGeneratorFunctionContext(config, contextTerm.get)
+ new WatermarkGeneratorFunctionContext(tableConfig, contextTerm.get)
} else {
- CodeGeneratorContext(config)
+ CodeGeneratorContext(tableConfig)
}
val generator = new ExprCodeGenerator(ctx, false)
.bindInput(inputType, inputTerm = "row")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index f212c4e..bbb7f4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -144,7 +144,7 @@ class AggsHandlerCodeGenerator(
this.constants = literals
val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
val exprs = literals.map(exprGenerator.generateExpression)
- this.constantExprs = exprs.map(ctx.addReusableConstant(_, nullCheck = true))
+ this.constantExprs = exprs.map(ctx.addReusableConstant)
this
}
@@ -309,7 +309,6 @@ class AggsHandlerCodeGenerator(
aggName: String): Option[Expression] = {
if (filterArg > 0) {
- val name = s"agg_${aggIndex}_filter"
val filterType = inputFieldTypes(filterArg)
if (!filterType.isInstanceOf[BooleanType]) {
throw new TableException(s"filter arg must be boolean, but is $filterType, " +
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 22cb1a9..44c0b28 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
import java.lang.reflect.Method
import scala.collection.mutable
-class FunctionGenerator private(config: TableConfig) {
+class FunctionGenerator private(tableConfig: TableConfig) {
val INTEGRAL_TYPES = Array(
TINYINT,
@@ -44,7 +44,7 @@ class FunctionGenerator private(config: TableConfig) {
mutable.Map()
val isStreamingMode = RuntimeExecutionMode.STREAMING.equals(
- config.getConfiguration.get(ExecutionOptions.RUNTIME_MODE))
+ tableConfig.getConfiguration.get(ExecutionOptions.RUNTIME_MODE))
// ----------------------------------------------------------------------------------------------
// Arithmetic functions
// ----------------------------------------------------------------------------------------------
@@ -942,5 +942,6 @@ class FunctionGenerator private(config: TableConfig) {
}
object FunctionGenerator {
- def getInstance(config: TableConfig): FunctionGenerator = new FunctionGenerator(config)
+ def getInstance(tableConfig: TableConfig): FunctionGenerator =
+ new FunctionGenerator(tableConfig)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
index 9b101aa..35b31ec 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.types.logical.RowType
* RANGE allow the compound ORDER BY and the random type when the bound is current row.
*/
class MultiFieldRangeBoundComparatorCodeGenerator(
- conf: TableConfig,
+ tableConfig: TableConfig,
inputType: RowType,
sortSpec: SortSpec,
isLowerBound: Boolean = true) {
@@ -45,7 +45,7 @@ class MultiFieldRangeBoundComparatorCodeGenerator(
if (isLowerBound) s"return $comp >= 0 ? 1 : -1;" else s"return $comp > 0 ? 1 : -1;"
}
- val ctx = CodeGeneratorContext(conf)
+ val ctx = CodeGeneratorContext(tableConfig)
val compareCode = GenerateUtils.generateRowCompare(ctx, inputType, sortSpec, input, current)
val code =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
index ff2a468..011f56e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
@@ -46,7 +46,7 @@ import java.math.BigDecimal
*/
class RangeBoundComparatorCodeGenerator(
relBuilder: RelBuilder,
- config: TableConfig,
+ tableConfig: TableConfig,
inputType: RowType,
bound: Any,
key: Int = -1,
@@ -59,7 +59,7 @@ class RangeBoundComparatorCodeGenerator(
val input = CodeGenUtils.DEFAULT_INPUT1_TERM
val current = CodeGenUtils.DEFAULT_INPUT2_TERM
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val inputExpr = GenerateUtils.generateFieldAccess(ctx, inputType, inputTerm = input, key)
val currentExpr = GenerateUtils.generateFieldAccess(ctx, inputType, inputTerm = current, key)
@@ -142,7 +142,7 @@ class RangeBoundComparatorCodeGenerator(
val relKeyType = typeFactory.createFieldTypeFromLogicalType(realKeyType)
//minus between inputValue and currentValue
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val exprCodeGenerator = new ExprCodeGenerator(ctx, false)
val minusCall = if (keyOrder) {
relBuilder.call(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index dd5df2c..28e279e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -35,12 +35,12 @@ import scala.collection.mutable
/**
* A code generator for generating [[NormalizedKeyComputer]] and [[RecordComparator]].
*
- * @param conf config of the planner.
+ * @param tableConfig config of the planner.
* @param input input type.
* @param sortSpec sort specification.
*/
class SortCodeGenerator(
- conf: TableConfig,
+ tableConfig: TableConfig,
val input: RowType,
val sortSpec: SortSpec) {
@@ -183,7 +183,7 @@ class SortCodeGenerator(
}
""".stripMargin
- new GeneratedNormalizedKeyComputer(className, code, conf.getConfiguration)
+ new GeneratedNormalizedKeyComputer(className, code, tableConfig.getConfiguration)
}
def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = {
@@ -383,7 +383,7 @@ class SortCodeGenerator(
*/
def generateRecordComparator(name: String): GeneratedRecordComparator = {
ComparatorCodeGenerator.gen(
- conf,
+ tableConfig,
name,
input,
sortSpec)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 8acfa41..006cb3c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -47,11 +47,11 @@ import scala.collection.JavaConversions._
class BatchPlanner(
executor: Executor,
- config: TableConfig,
+ tableConfig: TableConfig,
moduleManager: ModuleManager,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
- extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,
+ extends PlannerBase(executor, tableConfig, moduleManager, functionCatalog, catalogManager,
isStreamingMode = false) {
override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
@@ -132,7 +132,7 @@ class BatchPlanner(
private def createDummyPlanner(): BatchPlanner = {
val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
val executor = new DefaultExecutor(dummyExecEnv)
- new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager)
+ new BatchPlanner(executor, tableConfig, moduleManager, functionCatalog, catalogManager)
}
override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index b830141..4442160 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -23,10 +23,8 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
-import org.apache.flink.table.api.PlanReference.{ContentPlanReference, FilePlanReference, ResourcePlanReference}
import org.apache.flink.table.api._
import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
-import org.apache.flink.table.api.internal.CompiledPlanInternal
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -45,10 +43,9 @@ import org.apache.flink.table.planner.delegation.ParserFactory.DefaultParserCont
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.operations.PlannerQueryOperation
-import org.apache.flink.table.planner.plan.ExecNodeGraphCompiledPlan
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
import org.apache.flink.table.planner.plan.nodes.exec.processor.{ExecNodeGraphProcessor, ProcessorContext}
-import org.apache.flink.table.planner.plan.nodes.exec.serde.{JsonSerdeUtil, SerdeContext}
+import org.apache.flink.table.planner.plan.nodes.exec.serde.SerdeContext
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNodeGraph, ExecNodeGraphGenerator}
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.planner.plan.optimize.Optimizer
@@ -61,8 +58,6 @@ import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toS
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
-
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.{RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
@@ -70,7 +65,6 @@ import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.tools.FrameworkConfig
-import java.io.{File, IOException}
import java.lang.{Long => JLong}
import java.util
import java.util.{Collections, TimeZone}
@@ -86,7 +80,7 @@ import scala.collection.mutable
* @param executor instance of [[Executor]], needed to extract
* [[StreamExecutionEnvironment]] for
* [[org.apache.flink.table.sources.StreamTableSource.getDataStream]]
- * @param config mutable configuration passed from corresponding [[TableEnvironment]]
+ * @param tableConfig mutable configuration passed from corresponding [[TableEnvironment]]
* @param moduleManager manager for modules
* @param functionCatalog catalog of functions
* @param catalogManager manager of catalog meta objects such as tables, views, databases etc.
@@ -95,7 +89,7 @@ import scala.collection.mutable
*/
abstract class PlannerBase(
executor: Executor,
- config: TableConfig,
+ tableConfig: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
@@ -109,14 +103,14 @@ abstract class PlannerBase(
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
private val plannerConfiguration: ReadableConfig = new PlannerConfiguration(
- config.getConfiguration,
+ tableConfig.getConfiguration,
executor.getConfiguration)
@VisibleForTesting
private[flink] val plannerContext: PlannerContext =
new PlannerContext(
!isStreamingMode,
- config,
+ tableConfig,
moduleManager,
functionCatalog,
catalogManager,
@@ -148,7 +142,7 @@ abstract class PlannerBase(
/** Returns specific query [[Optimizer]] depends on the concrete type of this TableEnvironment. */
protected def getOptimizer: Optimizer
- def getTableConfig: TableConfig = config
+ def getTableConfig: TableConfig = tableConfig
def getFlinkContext: FlinkContext = plannerContext.getFlinkContext
@@ -349,7 +343,7 @@ abstract class PlannerBase(
val shuttle = new SameRelObjectShuttle()
val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
// reuse subplan
- val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)
+ val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
// convert FlinkPhysicalRel DAG to ExecNodeGraph
val generator = new ExecNodeGraphGenerator()
val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]))
@@ -488,7 +482,7 @@ abstract class PlannerBase(
* the configuration before planner do optimization with [[ModifyOperation]] or other works.
*/
protected def validateAndOverrideConfiguration(): Unit = {
- val configuration = config.getConfiguration
+ val configuration = tableConfig.getConfiguration
if (!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) {
throw new IllegalArgumentException(
"Mismatch between configured planner and actual planner. " +
@@ -502,7 +496,7 @@ abstract class PlannerBase(
val epochTime :JLong = System.currentTimeMillis()
configuration.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
val localTime :JLong = epochTime +
- TimeZone.getTimeZone(config.getLocalTimeZone).getOffset(epochTime)
+ TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
configuration.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
getExecEnv.configure(
@@ -521,7 +515,7 @@ abstract class PlannerBase(
* Cleanup all internal configuration after plan translation finished.
*/
protected def cleanupInternalConfigurations(): Unit = {
- val configuration = config.getConfiguration
+ val configuration = tableConfig.getConfiguration
configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME)
configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME)
}
@@ -563,7 +557,7 @@ abstract class PlannerBase(
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
- val streamGraph = executor.createPipeline(transformations, config.getConfiguration, null)
+ val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
.asInstanceOf[StreamGraph]
(sinkRelNodes, optimizedRelNodes, execGraph, streamGraph)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index cfef7d7..29e2115 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -52,11 +52,11 @@ import _root_.scala.collection.JavaConversions._
class StreamPlanner(
executor: Executor,
- config: TableConfig,
+ tableConfig: TableConfig,
moduleManager: ModuleManager,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
- extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,
+ extends PlannerBase(executor, tableConfig, moduleManager, functionCatalog, catalogManager,
isStreamingMode = true) {
override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
@@ -131,7 +131,7 @@ class StreamPlanner(
private def createDummyPlanner(): StreamPlanner = {
val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv)
val executor = new DefaultExecutor(dummyExecEnv)
- new StreamPlanner(executor, config, moduleManager, functionCatalog, catalogManager)
+ new StreamPlanner(executor, tableConfig, moduleManager, functionCatalog, catalogManager)
}
override def loadPlan(planReference: PlanReference): CompiledPlanInternal = {
@@ -142,15 +142,14 @@ class StreamPlanner(
objectReader.readValue(filePlanReference.getFile, classOf[ExecNodeGraph])
case contentPlanReference: ContentPlanReference =>
objectReader.readValue(contentPlanReference.getContent, classOf[ExecNodeGraph])
- case resourcePlanReference: ResourcePlanReference => {
+ case resourcePlanReference: ResourcePlanReference =>
val url = resourcePlanReference.getClassLoader
.getResource(resourcePlanReference.getResourcePath)
if (url == null) {
throw new IOException(
- "Cannot load the plan reference from classpath: " + planReference);
+ "Cannot load the plan reference from classpath: " + planReference)
}
objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
- }
case _ => throw new IllegalStateException(
"Unknown PlanReference. This is a bug, please contact the developers")
}
@@ -192,7 +191,7 @@ class StreamPlanner(
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
- val streamGraph = executor.createPipeline(transformations, config.getConfiguration, null)
+ val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
.asInstanceOf[StreamGraph]
val sb = new StringBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
index 50da830..37c7201 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalJoinBase.scala
@@ -48,10 +48,10 @@ abstract class BatchPhysicalJoinBase(
with BatchPhysicalRel {
private[flink] def generateCondition(
- config: TableConfig,
+ tableConfig: TableConfig,
leftType: RowType,
rightType: RowType): GeneratedJoinCondition = {
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
val exprGenerator = new ExprCodeGenerator(ctx, false)
.bindInput(leftType)
.bindSecondInput(rightType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
index 9e35197..5e3c42b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
@@ -30,7 +30,6 @@ import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.util.Preconditions
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexBuilder
import java.util.Collections
@@ -42,7 +41,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
// build RelNodeBlock plan
- val rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getTableConfig)
+ val rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getConfiguration)
// optimize recursively RelNodeBlock
rootBlocks.foreach(optimizeBlock)
rootBlocks
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
index 6ef66e7..761e7fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala
@@ -19,9 +19,8 @@
package org.apache.flink.table.planner.plan.optimize
import org.apache.flink.annotation.Experimental
-import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.ConfigOptions.key
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.configuration.{ConfigOption, ReadableConfig}
import org.apache.flink.table.planner.plan.`trait`.MiniBatchInterval
import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.planner.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle}
@@ -30,6 +29,7 @@ import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, ExpandTable
import org.apache.flink.util.Preconditions
import com.google.common.collect.Sets
+
import org.apache.calcite.rel._
import org.apache.calcite.rel.core.{Aggregate, Project, Snapshot, TableFunctionScan, Union}
import org.apache.calcite.rex.RexNode
@@ -258,13 +258,13 @@ class RelNodeWrapper(relNode: RelNode) {
/**
* Builds [[RelNodeBlock]] plan
*/
-class RelNodeBlockPlanBuilder private(config: TableConfig) {
+class RelNodeBlockPlanBuilder private(config: ReadableConfig) {
private val node2Wrapper = new util.IdentityHashMap[RelNode, RelNodeWrapper]()
private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]()
- private val isUnionAllAsBreakPointEnabled = config.getConfiguration.getBoolean(
- RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
+ private val isUnionAllAsBreakPointEnabled = config
+ .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)
/**
* Decompose the [[RelNode]] plan into many [[RelNodeBlock]]s,
@@ -415,7 +415,7 @@ object RelNodeBlockPlanBuilder {
*/
def buildRelNodeBlockPlan(
sinkNodes: Seq[RelNode],
- config: TableConfig): Seq[RelNodeBlock] = {
+ config: ReadableConfig): Seq[RelNodeBlock] = {
require(sinkNodes.nonEmpty)
// expand QueryOperationCatalogViewTable in TableScan
@@ -438,9 +438,9 @@ object RelNodeBlockPlanBuilder {
* @param relNodes RelNode trees
* @return RelNode dag which reuse common subPlan in each tree
*/
- private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: TableConfig): Seq[RelNode] = {
- val findOpBlockWithDigest = tableConfig.getConfiguration.getBoolean(
- RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
+ private def reuseRelNodes(relNodes: Seq[RelNode], config: ReadableConfig): Seq[RelNode] = {
+ val findOpBlockWithDigest = config
+ .get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
if (!findOpBlockWithDigest) {
return relNodes
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index 707f52f..6205d93 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex.RexBuilder
import java.util
import java.util.Collections
@@ -50,7 +49,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
extends CommonSubGraphBasedOptimizer {
override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
- val config = planner.getTableConfig
+ val config = planner.getConfiguration
// build RelNodeBlock plan
val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, config)
// infer trait properties for sink block
@@ -58,9 +57,9 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
// don't require update before by default
sinkBlock.setUpdateBeforeRequired(false)
- val miniBatchInterval: MiniBatchInterval = if (config.getConfiguration.getBoolean(
+ val miniBatchInterval: MiniBatchInterval = if (config.get(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) {
- val miniBatchLatency = config.getConfiguration.get(
+ val miniBatchLatency = config.get(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY).toMillis
Preconditions.checkArgument(miniBatchLatency > 0,
"MiniBatch Latency must be greater than 0 ms.", null)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index d354b65..dbb60ff 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -70,11 +70,11 @@ object IntervalJoinUtil {
leftLogicalFieldCnt: Int,
joinRowType: RelDataType,
rexBuilder: RexBuilder,
- config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
+ tableConfig: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
// Converts the condition to conjunctive normal form (CNF)
val cnfCondition = FlinkRexUtil.toCnf(rexBuilder,
- config.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT),
+ tableConfig.getConfiguration.getInteger(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT),
predicate)
// split the condition into time predicates and other predicates
@@ -119,7 +119,8 @@ object IntervalJoinUtil {
}
// assemble window bounds from predicates
- val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
+ val streamTimeOffsets = timePreds.map(
+ computeWindowBoundFromPredicate(_, rexBuilder, tableConfig))
val (leftLowerBound, leftUpperBound) =
streamTimeOffsets match {
case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
@@ -323,7 +324,7 @@ object IntervalJoinUtil {
private def computeWindowBoundFromPredicate(
timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): Option[WindowBound] = {
+ tableConfig: TableConfig): Option[WindowBound] = {
val isLeftLowerBound: Boolean =
timePred.pred.getKind match {
@@ -338,7 +339,7 @@ object IntervalJoinUtil {
}
// reduce predicate to constants to compute bounds
- val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+ val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, tableConfig)
if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
return None
@@ -370,15 +371,15 @@ object IntervalJoinUtil {
* Replaces the time attributes on both sides of a time predicate by a zero literal and
* reduces the expressions on both sides to a long literal.
*
- * @param timePred The time predicate which both sides are reduced.
- * @param rexBuilder A RexBuilder
- * @param config A TableConfig.
+ * @param timePred The time predicate which both sides are reduced.
+ * @param rexBuilder A RexBuilder
+ * @param tableConfig A TableConfig.
* @return The values of the reduced literals on both sides of the time comparison predicate.
*/
private def reduceTimeExpression(
timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): (Option[Long], Option[Long]) = {
+ tableConfig: TableConfig): (Option[Long], Option[Long]) = {
/**
* Checks if the given call is a materialization call for either proctime or rowtime.
@@ -426,7 +427,7 @@ object IntervalJoinUtil {
val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
// reduce expression to literal
- val exprReducer = new ExpressionReducer(config, allowChangeNullability = true)
+ val exprReducer = new ExpressionReducer(tableConfig, allowChangeNullability = true)
val originList = new util.ArrayList[RexNode]()
originList.add(leftSideWithLiteral)
originList.add(rightSideWithLiteral)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
index cf9a3a3..dff51b9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
@@ -124,23 +124,23 @@ object JoinUtil {
}
def generateConditionFunction(
- config: TableConfig,
+ tableConfig: TableConfig,
joinSpec: JoinSpec,
leftType: LogicalType,
rightType: LogicalType): GeneratedJoinCondition = {
generateConditionFunction(
- config,
- joinSpec.getNonEquiCondition().orElse(null),
+ tableConfig,
+ joinSpec.getNonEquiCondition.orElse(null),
leftType,
rightType)
}
def generateConditionFunction(
- config: TableConfig,
+ tableConfig: TableConfig,
nonEquiCondition: RexNode,
leftType: LogicalType,
rightType: LogicalType): GeneratedJoinCondition = {
- val ctx = CodeGeneratorContext(config)
+ val ctx = CodeGeneratorContext(tableConfig)
// should consider null fields
val exprGenerator = new ExprCodeGenerator(ctx, false)
.bindInput(leftType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
index 99a11d8..9f1c47b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
@@ -74,7 +74,7 @@ object PartitionPruner {
* @return Pruned partitions.
*/
def prunePartitions(
- config: TableConfig,
+ tableConfig: TableConfig,
partitionFieldNames: Array[String],
partitionFieldTypes: Array[LogicalType],
allPartitions: JList[JMap[String, String]],
@@ -87,7 +87,7 @@ object PartitionPruner {
val inputType = InternalTypeInfo.ofFields(partitionFieldTypes, partitionFieldNames).toRowType
val returnType: LogicalType = new BooleanType(false)
- val ctx = new ConstantCodeGeneratorContext(config)
+ val ctx = new ConstantCodeGeneratorContext(tableConfig)
val collectorTerm = DEFAULT_COLLECTOR_TERM
val exprGenerator = new ExprCodeGenerator(ctx, false)
@@ -119,8 +119,8 @@ object PartitionPruner {
val results: JList[Boolean] = new JArrayList[Boolean](allPartitions.size)
val collector = new ListCollector[Boolean](results)
- val parameters = if (config.getConfiguration != null) {
- config.getConfiguration
+ val parameters = if (tableConfig.getConfiguration != null) {
+ tableConfig.getConfiguration
} else {
new Configuration()
}
@@ -129,7 +129,7 @@ object PartitionPruner {
// do filter against all partitions
allPartitions.foreach { partition =>
val row = convertPartitionToRow(
- config.getLocalTimeZone, partitionFieldNames, partitionFieldTypes, partition)
+ tableConfig.getLocalTimeZone, partitionFieldNames, partitionFieldTypes, partition)
collector.collect(richMapFunction.map(row))
}
} finally {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index eec4b9f..ae9e522 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -65,17 +65,17 @@ object RankUtil {
* @param oriPred the original predicate
* @param rankFieldIndex the index of rank field
* @param rexBuilder RexBuilder
- * @param config TableConfig
+ * @param tableConfig TableConfig
* @return A Tuple2 of extracted rank range and remaining predicates.
*/
def extractRankRange(
oriPred: RexNode,
rankFieldIndex: Int,
rexBuilder: RexBuilder,
- config: TableConfig): (Option[RankRange], Option[RexNode]) = {
+ tableConfig: TableConfig): (Option[RankRange], Option[RexNode]) = {
val predicate = FlinkRexUtil.expandSearch(rexBuilder, oriPred)
// Converts the condition to conjunctive normal form (CNF)
- val cnfNodeCount = config.getConfiguration.getInteger(
+ val cnfNodeCount = tableConfig.getConfiguration.getInteger(
FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val cnfCondition = FlinkRexUtil.toCnf(rexBuilder, cnfNodeCount, predicate)
@@ -105,7 +105,7 @@ object RankUtil {
return (None, Some(predicate))
}
- val sortBounds = limitPreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
+ val sortBounds = limitPreds.map(computeWindowBoundFromPredicate(_, rexBuilder, tableConfig))
val rankRange = sortBounds match {
case Seq(Some(LowerBoundary(x)), Some(UpperBoundary(y))) =>
new ConstantRankRange(x, y)
@@ -207,7 +207,7 @@ object RankUtil {
private def computeWindowBoundFromPredicate(
limitPred: LimitPredicate,
rexBuilder: RexBuilder,
- config: TableConfig): Option[Boundary] = {
+ tableConfig: TableConfig): Option[Boundary] = {
val bound: BoundDefine = limitPred.pred.getKind match {
case SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL if limitPred.rankOnLeftSide =>
@@ -233,7 +233,7 @@ object RankUtil {
case (_: RexInputRef, Lower) => None
case _ =>
// reduce predicate to constants to compute bounds
- val literal = reduceComparisonPredicate(limitPred, rexBuilder, config)
+ val literal = reduceComparisonPredicate(limitPred, rexBuilder, tableConfig)
if (literal.isEmpty) {
None
} else {
@@ -264,15 +264,15 @@ object RankUtil {
* Replaces the rank aggregate reference on of a predicate by a zero literal and
* reduces the expressions on both sides to a long literal.
*
- * @param limitPred The limit predicate which both sides are reduced.
- * @param rexBuilder A RexBuilder
- * @param config A TableConfig.
+ * @param limitPred The limit predicate which both sides are reduced.
+ * @param rexBuilder A RexBuilder
+ * @param tableConfig A TableConfig.
* @return The values of the reduced literals on both sides of the comparison predicate.
*/
private def reduceComparisonPredicate(
limitPred: LimitPredicate,
rexBuilder: RexBuilder,
- config: TableConfig): Option[Long] = {
+ tableConfig: TableConfig): Option[Long] = {
val expression = if (limitPred.rankOnLeftSide) {
limitPred.pred.operands.get(1)
@@ -285,7 +285,7 @@ object RankUtil {
}
// reduce expression to literal
- val exprReducer = new ExpressionReducer(config)
+ val exprReducer = new ExpressionReducer(tableConfig)
val originList = new util.ArrayList[RexNode]()
originList.add(expression)
val reduceList = new util.ArrayList[RexNode]()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index e8cbdf5..e1bea1a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -516,11 +516,11 @@ public class DataStreamJavaITCase extends AbstractTestBase {
public void testToDataStreamCustomEventTime() throws Exception {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- final TableConfig config = tableEnv.getConfig();
+ final TableConfig tableConfig = tableEnv.getConfig();
// session time zone should not have an impact on the conversion
- final ZoneId originalZone = config.getLocalTimeZone();
- config.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
+ final ZoneId originalZone = tableConfig.getLocalTimeZone();
+ tableConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
final LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000");
final LocalDateTime localDateTime2 = LocalDateTime.parse("1970-01-01T01:00:00.000");
@@ -567,7 +567,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(),
localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli());
- config.setLocalTimeZone(originalZone);
+ tableConfig.setLocalTimeZone(originalZone);
}
@Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 5d2e12e..80c4f71 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1312,14 +1312,14 @@ class CalcITCase extends BatchTestBase {
def testCalcBinary(): Unit = {
registerCollection(
"BinaryT",
- nullData3.map((r) => row(r.getField(0), r.getField(1),
+ nullData3.map(r => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
checkResult(
"select a, b, c from BinaryT where b < 1000",
- nullData3.map((r) => row(r.getField(0), r.getField(1),
+ nullData3.map(r => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
)
}
@@ -1328,19 +1328,19 @@ class CalcITCase extends BatchTestBase {
def testOrderByBinary(): Unit = {
registerCollection(
"BinaryT",
- nullData3.map((r) => row(r.getField(0), r.getField(1),
+ nullData3.map(r => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
- conf.getConfiguration.setInteger(
+ tableConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
- conf.getConfiguration.setBoolean(
+ tableConfig.getConfiguration.setBoolean(
BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, true)
checkResult(
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
- x.getField(2).asInstanceOf[String]).map((r) =>
+ x.getField(2).asInstanceOf[String]).map(r =>
row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
isSorted = true
@@ -1351,7 +1351,7 @@ class CalcITCase extends BatchTestBase {
def testGroupByBinary(): Unit = {
registerCollection(
"BinaryT2",
- nullData3.map((r) => row(r.getField(0),
+ nullData3.map(r => row(r.getField(0),
r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
"a, b, c",
@@ -1443,7 +1443,7 @@ class CalcITCase extends BatchTestBase {
val query = """
|select * from myTable where f0 in (1.0, 2.0, 3.0)
- |""".stripMargin;
+ |""".stripMargin
checkResult(
query,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index 8b87824..312a7bc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
class LimitITCase extends LegacyLimitITCase {
override def before(): Unit = {
- BatchTestBase.configForMiniCluster(conf)
+ BatchTestBase.configForMiniCluster(tableConfig)
registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
index ff8e3ad..5817fdb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
@@ -109,7 +109,7 @@ class SortLimitITCase extends BatchTestBase {
@Test
def testOrderBehindField(): Unit = {
- conf.getConfiguration.setInteger(
+ tableConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
val expected = data3.sortBy((x : Row) => x.getField(2).asInstanceOf[String])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
index f224425..b16d4e7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/InnerJoinITCase.scala
@@ -154,7 +154,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
@Test
def testBigForSpill(): Unit = {
- conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+ tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
val bigData = Random.shuffle(
bigIntStringData.union(bigIntStringData).union(bigIntStringData).union(bigIntStringData))
@@ -169,7 +169,7 @@ class InnerJoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
@Test
def testSortMergeJoinOutputOrder(): Unit = {
if (expectedJoinType == SortMergeJoin) {
- conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
+ tableConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
env.getConfig.setParallelism(1)
val bigData = Random.shuffle(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
index ccda349..0ace286 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
@@ -26,7 +26,7 @@ class LimitITCase extends LegacyLimitITCase {
@Before
override def before(): Unit = {
- BatchTestBase.configForMiniCluster(conf)
+ BatchTestBase.configForMiniCluster(tableConfig)
val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
val ddl =
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 11f746b..3146565 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -67,7 +67,7 @@ class BatchTestBase extends BatchAbstractTestBase {
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
val env: StreamExecutionEnvironment = planner.getExecEnv
env.getConfig.enableObjectReuse()
- val conf: TableConfig = tEnv.getConfig
+ val tableConfig: TableConfig = tEnv.getConfig
val LINE_COL_PATTERN: Pattern = Pattern.compile("At line ([0-9]+), column ([0-9]+)")
val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
@@ -75,7 +75,7 @@ class BatchTestBase extends BatchAbstractTestBase {
@Before
def before(): Unit = {
- BatchTestBase.configForMiniCluster(conf)
+ BatchTestBase.configForMiniCluster(tableConfig)
}
@After
@@ -517,9 +517,10 @@ object BatchTestBase {
}
}
- def configForMiniCluster(conf: TableConfig): Unit = {
- conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
- conf.getConfiguration
+ def configForMiniCluster(tableConfig: TableConfig): Unit = {
+ tableConfig.
+ getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
+ tableConfig.getConfiguration
.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED)
}
}