You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2023/12/14 09:26:38 UTC
(flink) 08/11: Add switch for consider benefit
This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch debug_q58
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8d5d27aab6647840c3883a04bfc9b892495a0755
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Aug 22 13:23:37 2023 +0800
Add switch for consider benefit
---
.../flink/table/api/config/OptimizerConfigOptions.java | 7 +++++++
.../optimize/program/FlinkRuntimeFilterProgram.java | 17 +++++++++++------
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 15783ffc442..e38bcce654a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -186,6 +186,13 @@ public class OptimizerConfigOptions {
"A flag to enable or disable the runtime filter. "
+ "When it is true, the optimizer will try to inject a runtime filter for eligible join.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Boolean> TABLE_OPTIMIZER_RUNTIME_FILTER_CONSIDER_BENEFIT =
+ key("table.optimizer.runtime-filter.consider-benefit")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether consider benefit.");
+
/**
* The data volume of build side needs to be under this value. If the data volume of build side
* is too large, the building overhead will be too large, which may lead to a negative impact on
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
index 1e8101fdf50..39e82be20de 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
@@ -503,17 +503,17 @@ public class FlinkRuntimeFilterProgram implements FlinkOptimizeProgram<BatchOpti
// we may find more cases later
}
- if (filterHasBenefit) {
+ if (considerBenefit(rel) && !filterHasBenefit) {
+ // If the probe side is a direct table source, or only simple Calc, no other operations,
+ // we will not inject runtime filter, because we believe the benefit to be small or even
+ // negative
+ return rel;
+ } else {
return createNewProbeWithRuntimeFilter(
ignoreExchange(buildSideInfo.buildSide),
ignoreExchange(rel),
buildSideInfo.buildIndices,
probeIndices);
- } else {
- // If the probe side is a direct table source, or only simple Calc, no other operations,
- // we will not inject runtime filter, because we believe the benefit to be small or even
- // negative
- return rel;
}
}
@@ -690,6 +690,11 @@ public class FlinkRuntimeFilterProgram implements FlinkOptimizeProgram<BatchOpti
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED);
}
+ private static boolean considerBenefit(RelNode relNode) {
+ return unwrapTableConfig(relNode)
+ .get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_CONSIDER_BENEFIT);
+ }
+
private static long getMaxBuildDataSize(RelNode relNode) {
return unwrapTableConfig(relNode)
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE)