You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2023/12/14 09:26:38 UTC

(flink) 08/11: Add switch for consider benefit

This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch debug_q58
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d5d27aab6647840c3883a04bfc9b892495a0755
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Aug 22 13:23:37 2023 +0800

    Add switch for consider benefit
---
 .../flink/table/api/config/OptimizerConfigOptions.java  |  7 +++++++
 .../optimize/program/FlinkRuntimeFilterProgram.java     | 17 +++++++++++------
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 15783ffc442..e38bcce654a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -186,6 +186,13 @@ public class OptimizerConfigOptions {
                             "A flag to enable or disable the runtime filter. "
                                     + "When it is true, the optimizer will try to inject a runtime filter for eligible join.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> TABLE_OPTIMIZER_RUNTIME_FILTER_CONSIDER_BENEFIT =
+            key("table.optimizer.runtime-filter.consider-benefit")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether consider benefit.");
+
     /**
      * The data volume of build side needs to be under this value. If the data volume of build side
      * is too large, the building overhead will be too large, which may lead to a negative impact on
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
index 1e8101fdf50..39e82be20de 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java
@@ -503,17 +503,17 @@ public class FlinkRuntimeFilterProgram implements FlinkOptimizeProgram<BatchOpti
             // we may find more cases later
         }
 
-        if (filterHasBenefit) {
+        if (considerBenefit(rel) && !filterHasBenefit) {
+            // If the probe side is a direct table source, or only simple Calc, no other operations,
+            // we will not inject runtime filter, because we believe the benefit to be small or even
+            // negative
+            return rel;
+        } else {
             return createNewProbeWithRuntimeFilter(
                     ignoreExchange(buildSideInfo.buildSide),
                     ignoreExchange(rel),
                     buildSideInfo.buildIndices,
                     probeIndices);
-        } else {
-            // If the probe side is a direct table source, or only simple Calc, no other operations,
-            // we will not inject runtime filter, because we believe the benefit to be small or even
-            // negative
-            return rel;
         }
     }
 
@@ -690,6 +690,11 @@ public class FlinkRuntimeFilterProgram implements FlinkOptimizeProgram<BatchOpti
                 .get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED);
     }
 
+    private static boolean considerBenefit(RelNode relNode) {
+        return unwrapTableConfig(relNode)
+                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_CONSIDER_BENEFIT);
+    }
+
     private static long getMaxBuildDataSize(RelNode relNode) {
         return unwrapTableConfig(relNode)
                 .get(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE)