You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/23 02:13:22 UTC

(doris) 12/43: [opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 332b9cb6199bcfd533a7863bf2315e579f45c330
Author: minghong <en...@gmail.com>
AuthorDate: Mon Jan 22 09:22:19 2024 +0800

    [opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148)
    
    
        1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join
        tpcds1T, q48 improved from 4.x sec to 1.x sec
        2. skip some redunant runtime filter
        example: A join B on A.a1=B.b and A.a1 = A.a2
        RF B.b->(A.a1, A.a2)
        however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
        we skip RF(B.b->A.a2)
        Issue Number: close #xxx
---
 .../glue/translator/RuntimeFilterTranslator.java   |  6 -----
 .../processor/post/RuntimeFilterGenerator.java     |  2 +-
 .../trees/plans/physical/AbstractPhysicalJoin.java | 15 ++++++++++++
 .../trees/plans/physical/AbstractPhysicalPlan.java | 21 ++++++++++------
 .../trees/plans/physical/RuntimeFilter.java        |  4 ++++
 .../rf_prune/query64.out                           | 16 ++++++-------
 .../nereids_tpcds_shape_sf100_p0/shape/query64.out | 28 +++++++++++-----------
 7 files changed, 56 insertions(+), 36 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 91f97906097..30a69ff97de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -38,7 +38,6 @@ import org.apache.doris.planner.JoinNodeBase;
 import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
@@ -185,11 +184,6 @@ public class RuntimeFilterTranslator {
         origFilter.markFinalized();
         origFilter.assignToPlanNodes();
         origFilter.extractTargetsPosition();
-        // Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
-        if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
-                && SessionVariable.enablePipelineEngine()) {
-            origFilter.setType(TRuntimeFilterType.BLOOM);
-        }
         return origFilter;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 76d189ba63d..8a75ad36c5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -567,7 +567,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
                 PhysicalHashJoin<? extends Plan, ? extends Plan> join = innerEntry.getValue();
                 Preconditions.checkState(join != null);
                 TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
-                if (ctx.getSessionVariable().getEnablePipelineEngine()) {
+                if (ctx.getSessionVariable().getEnablePipelineEngine() && !join.isBroadCastJoin()) {
                     type = TRuntimeFilterType.BLOOM;
                 }
                 EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index 16ba68aac62..7d1c65b5899 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -19,6 +19,8 @@ package org.apache.doris.nereids.trees.plans.physical;
 
 import org.apache.doris.nereids.hint.DistributeHint;
 import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DistributionSpec;
+import org.apache.doris.nereids.properties.DistributionSpecReplicated;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -251,4 +253,17 @@ public abstract class AbstractPhysicalJoin<
         return Utils.toSqlString(this.getClass().getSimpleName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
                 args.toArray());
     }
+
+    /**
+     * true if this is a broadcast join
+     */
+    public boolean isBroadCastJoin() {
+        if (child(1) instanceof PhysicalDistribute) {
+            DistributionSpec distSpec = ((PhysicalDistribute) child(1)).getDistributionSpec();
+            if (distSpec instanceof DistributionSpecReplicated) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index a2968ca8089..6a19abcc8fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -112,19 +112,26 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
         // in-filter is not friendly to pipeline
         if (type == TRuntimeFilterType.IN_OR_BLOOM
                 && ctx.getSessionVariable().getEnablePipelineEngine()
-                && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) {
+                && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)
+                && !builderNode.isBroadCastJoin()) {
             type = TRuntimeFilterType.BLOOM;
         }
         org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter =
                 ctx.getRuntimeFilterBySrcAndType(src, type, builderNode);
         Preconditions.checkState(scanSlot != null, "scan slot is null");
         if (filter != null) {
-            this.addAppliedRuntimeFilter(filter);
-            filter.addTargetSlot(scanSlot, scan);
-            filter.addTargetExpression(scanSlot);
-            ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
-            ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
-            ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
+            if (!filter.hasTargetScan(scan)) {
+                // A join B on A.a1=B.b and A.a1 = A.a2
+                // RF B.b->(A.a1, A.a2)
+                // however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
+                // we skip RF(B.b->A.a2)
+                this.addAppliedRuntimeFilter(filter);
+                filter.addTargetSlot(scanSlot, scan);
+                filter.addTargetExpression(scanSlot);
+                ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
+                ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
+                ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
+            }
         } else {
             filter = new RuntimeFilter(generator.getNextId(),
                     src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index a8f4c3cd7c8..3a3b01daecd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -148,6 +148,10 @@ public class RuntimeFilter {
         return targetScans;
     }
 
+    public boolean hasTargetScan(PhysicalRelation scan) {
+        return targetScans.contains(scan);
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
index e5e915d4422..367d659e25c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
@@ -16,22 +16,22 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 --------------------------PhysicalProject
 ----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=()
 ------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk]
+--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk]
 ----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16
+------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15
 ----------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number]
+--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number]
 ----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15
+------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14
 ----------------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------------PhysicalProject
---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk]
+--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk]
 ----------------------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------------------PhysicalProject
---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13
+--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12
 ----------------------------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk]
+------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk]
 --------------------------------------------------PhysicalProject
 ----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=()
 ------------------------------------------------------PhysicalProject
@@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 ----------------------------------------------------------------------------------PhysicalProject
 ------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk]
 --------------------------------------------------------------------------------------PhysicalProject
-----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12
+----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11
 --------------------------------------------------------------------------------------PhysicalProject
 ----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns]
 ------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
index 43bdb50fcce..d40500b774a 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
@@ -7,31 +7,31 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 --------PhysicalDistribute[DistributionSpecHash]
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF20 d_date_sk->[c_first_shipto_date_sk]
+--------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_shipto_date_sk]
 ----------------PhysicalProject
-------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_sales_date_sk]
+------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[c_first_sales_date_sk]
 --------------------PhysicalProject
-----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF18 c_customer_sk->[ss_customer_sk]
+----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF17 c_customer_sk->[ss_customer_sk]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------PhysicalProject
-----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF17 p_promo_sk->[ss_promo_sk]
+----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF16 p_promo_sk->[ss_promo_sk]
 ------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk]
+--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk]
 ----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16
+------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15
 ----------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number]
+--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number]
 ----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15
+------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14
 ----------------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------------PhysicalProject
---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk]
+--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk]
 ----------------------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------------------PhysicalProject
---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13
+--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12
 ----------------------------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk]
+------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk]
 --------------------------------------------------PhysicalProject
 ----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF10 s_store_sk->[ss_store_sk]
 ------------------------------------------------------PhysicalProject
@@ -43,7 +43,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 ------------------------------------------------------------------PhysicalProject
 --------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = cs_ui.cs_item_sk)) otherCondition=() build RFs:RF6 cs_item_sk->[ss_item_sk]
 ----------------------------------------------------------------------PhysicalProject
-------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF17 RF18
+------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF16 RF17
 ----------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------------------------------------------------PhysicalProject
 --------------------------------------------------------------------------filter((sale > (2 * refund)))
@@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 ----------------------------------------------------------------------------------PhysicalProject
 ------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk]
 --------------------------------------------------------------------------------------PhysicalProject
-----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12
+----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11
 --------------------------------------------------------------------------------------PhysicalProject
 ----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns]
 ------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
@@ -86,7 +86,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
 --------------------------------------PhysicalProject
 ----------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_hdemo_sk = hd2.hd_demo_sk)) otherCondition=() build RFs:RF1 hd_demo_sk->[c_current_hdemo_sk]
 ------------------------------------------PhysicalProject
---------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF19 RF20
+--------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF18 RF19
 ------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
 --------------------------------------------PhysicalProject
 ----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((hd2.hd_income_band_sk = ib2.ib_income_band_sk)) otherCondition=() build RFs:RF0 ib_income_band_sk->[hd_income_band_sk]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org