You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/23 02:13:22 UTC
(doris) 12/43: [opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 332b9cb6199bcfd533a7863bf2315e579f45c330
Author: minghong <en...@gmail.com>
AuthorDate: Mon Jan 22 09:22:19 2024 +0800
[opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148)
1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join
tpcds1T, q48 improved from 4.x sec to 1.x sec
2. skip some redunant runtime filter
example: A join B on A.a1=B.b and A.a1 = A.a2
RF B.b->(A.a1, A.a2)
however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
we skip RF(B.b->A.a2)
Issue Number: close #xxx
---
.../glue/translator/RuntimeFilterTranslator.java | 6 -----
.../processor/post/RuntimeFilterGenerator.java | 2 +-
.../trees/plans/physical/AbstractPhysicalJoin.java | 15 ++++++++++++
.../trees/plans/physical/AbstractPhysicalPlan.java | 21 ++++++++++------
.../trees/plans/physical/RuntimeFilter.java | 4 ++++
.../rf_prune/query64.out | 16 ++++++-------
.../nereids_tpcds_shape_sf100_p0/shape/query64.out | 28 +++++++++++-----------
7 files changed, 56 insertions(+), 36 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 91f97906097..30a69ff97de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -38,7 +38,6 @@ import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -185,11 +184,6 @@ public class RuntimeFilterTranslator {
origFilter.markFinalized();
origFilter.assignToPlanNodes();
origFilter.extractTargetsPosition();
- // Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
- if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
- && SessionVariable.enablePipelineEngine()) {
- origFilter.setType(TRuntimeFilterType.BLOOM);
- }
return origFilter;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 76d189ba63d..8a75ad36c5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -567,7 +567,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
PhysicalHashJoin<? extends Plan, ? extends Plan> join = innerEntry.getValue();
Preconditions.checkState(join != null);
TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
- if (ctx.getSessionVariable().getEnablePipelineEngine()) {
+ if (ctx.getSessionVariable().getEnablePipelineEngine() && !join.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index 16ba68aac62..7d1c65b5899 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -19,6 +19,8 @@ package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DistributionSpec;
+import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
@@ -251,4 +253,17 @@ public abstract class AbstractPhysicalJoin<
return Utils.toSqlString(this.getClass().getSimpleName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
args.toArray());
}
+
+ /**
+ * true if this is a broadcast join
+ */
+ public boolean isBroadCastJoin() {
+ if (child(1) instanceof PhysicalDistribute) {
+ DistributionSpec distSpec = ((PhysicalDistribute) child(1)).getDistributionSpec();
+ if (distSpec instanceof DistributionSpecReplicated) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index a2968ca8089..6a19abcc8fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -112,19 +112,26 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
- && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) {
+ && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)
+ && !builderNode.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}
org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter =
ctx.getRuntimeFilterBySrcAndType(src, type, builderNode);
Preconditions.checkState(scanSlot != null, "scan slot is null");
if (filter != null) {
- this.addAppliedRuntimeFilter(filter);
- filter.addTargetSlot(scanSlot, scan);
- filter.addTargetExpression(scanSlot);
- ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
- ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
- ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
+ if (!filter.hasTargetScan(scan)) {
+ // A join B on A.a1=B.b and A.a1 = A.a2
+ // RF B.b->(A.a1, A.a2)
+ // however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
+ // we skip RF(B.b->A.a2)
+ this.addAppliedRuntimeFilter(filter);
+ filter.addTargetSlot(scanSlot, scan);
+ filter.addTargetExpression(scanSlot);
+ ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
+ ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
+ ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
+ }
} else {
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index a8f4c3cd7c8..3a3b01daecd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -148,6 +148,10 @@ public class RuntimeFilter {
return targetScans;
}
+ public boolean hasTargetScan(PhysicalRelation scan) {
+ return targetScans.contains(scan);
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
index e5e915d4422..367d659e25c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out
@@ -16,22 +16,22 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=()
------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk]
+--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk]
----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16
+------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number]
+--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number]
----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15
+------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk]
+--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk]
----------------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------------PhysicalProject
---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13
+--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12
----------------------------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk]
+------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk]
--------------------------------------------------PhysicalProject
----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=()
------------------------------------------------------PhysicalProject
@@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
----------------------------------------------------------------------------------PhysicalProject
------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk]
--------------------------------------------------------------------------------------PhysicalProject
-----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12
+----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11
--------------------------------------------------------------------------------------PhysicalProject
----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns]
------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
index 43bdb50fcce..d40500b774a 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out
@@ -7,31 +7,31 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
--------PhysicalDistribute[DistributionSpecHash]
----------hashAgg[LOCAL]
------------PhysicalProject
---------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF20 d_date_sk->[c_first_shipto_date_sk]
+--------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_shipto_date_sk]
----------------PhysicalProject
-------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_sales_date_sk]
+------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[c_first_sales_date_sk]
--------------------PhysicalProject
-----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF18 c_customer_sk->[ss_customer_sk]
+----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF17 c_customer_sk->[ss_customer_sk]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
-----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF17 p_promo_sk->[ss_promo_sk]
+----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF16 p_promo_sk->[ss_promo_sk]
------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk]
+--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk]
----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16
+------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number]
+--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number]
----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15
+------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14
----------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------PhysicalProject
---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk]
+--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk]
----------------------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------------------PhysicalProject
---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13
+--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12
----------------------------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk]
+------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk]
--------------------------------------------------PhysicalProject
----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF10 s_store_sk->[ss_store_sk]
------------------------------------------------------PhysicalProject
@@ -43,7 +43,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
------------------------------------------------------------------PhysicalProject
--------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = cs_ui.cs_item_sk)) otherCondition=() build RFs:RF6 cs_item_sk->[ss_item_sk]
----------------------------------------------------------------------PhysicalProject
-------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF17 RF18
+------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF16 RF17
----------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------------------------------------------------PhysicalProject
--------------------------------------------------------------------------filter((sale > (2 * refund)))
@@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
----------------------------------------------------------------------------------PhysicalProject
------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk]
--------------------------------------------------------------------------------------PhysicalProject
-----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12
+----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11
--------------------------------------------------------------------------------------PhysicalProject
----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns]
------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
@@ -86,7 +86,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 )
--------------------------------------PhysicalProject
----------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_hdemo_sk = hd2.hd_demo_sk)) otherCondition=() build RFs:RF1 hd_demo_sk->[c_current_hdemo_sk]
------------------------------------------PhysicalProject
---------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF19 RF20
+--------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF18 RF19
------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------------------PhysicalProject
----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((hd2.hd_income_band_sk = ib2.ib_income_band_sk)) otherCondition=() build RFs:RF0 ib_income_band_sk->[hd_income_band_sk]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org