You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/09 11:00:34 UTC

[incubator-doris] branch master updated: [improvement](join) update broadcast join cost algorithm (#8695)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ddf7ef9327 [improvement](join) update broadcast join cost algorithm (#8695)
ddf7ef9327 is described below

commit ddf7ef9327bde5498426cf1269327977c6d93ea0
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Sat Apr 9 19:00:27 2022 +0800

    [improvement](join) update broadcast join cost algorithm (#8695)
    
    broadcast join cost is used compressed data size currently.
    The amount of memory used may be significantly more than estimated.
    This patch:
    1. add a compressed ratio to broadcast join cost and set to 5 according to the experience.
    2. add a new session variable `auto_broadcast_join_threshold` to limit memory used by broadcast in bytes, the default value is 1073741824(1GB)
---
 docs/en/getting-started/advance-usage.md           | 22 ++++++++++------
 docs/zh-CN/getting-started/advance-usage.md        | 14 ++++++++---
 .../java/org/apache/doris/analysis/Analyzer.java   | 19 +++++++++++++-
 .../apache/doris/planner/DistributedPlanner.java   | 29 ++++++++++------------
 .../org/apache/doris/planner/OlapScanNode.java     |  5 +++-
 .../java/org/apache/doris/qe/SessionVariable.java  |  8 ++++++
 .../doris/planner/DistributedPlannerTest.java      | 24 ++++++++++++++++++
 7 files changed, 91 insertions(+), 30 deletions(-)

diff --git a/docs/en/getting-started/advance-usage.md b/docs/en/getting-started/advance-usage.md
index 5429b53c39..cfaa54dffe 100644
--- a/docs/en/getting-started/advance-usage.md
+++ b/docs/en/getting-started/advance-usage.md
@@ -179,8 +179,8 @@ mysql> SHOW VARIABLES LIKE "%mem_limit%";
 1 row in set (0.00 sec)
 ```
 
->* The above modification is session level and is only valid within the current connection session. Disconnecting and reconnecting will change back to the default value.
->* If you need to modify the global variable, you can set it as follows: `SET GLOBAL exec_mem_limit = 8589934592;` When the setup is complete, disconnect the session and log in again, and the parameters will take effect permanently.
+> * The above modification is session level and is only valid within the current connection session. Disconnecting and reconnecting will change back to the default value.
+> * If you need to modify the global variable, you can set it as follows: `SET GLOBAL exec_mem_limit = 8589934592;` When the setup is complete, disconnect the session and log in again, and the parameters will take effect permanently.
 
 ### 2.2 Query timeout
 
@@ -202,18 +202,24 @@ Modify the timeout to 1 minute:
 
 `SET query timeout =60;`
 
->* The current timeout check interval is 5 seconds, so timeouts less than 5 seconds are not very accurate.
->* The above modifications are also session level. Global validity can be modified by `SET GLOBAL`.
+> * The current timeout check interval is 5 seconds, so timeouts less than 5 seconds are not very accurate.
+> * The above modifications are also session level. Global validity can be modified by `SET GLOBAL`.
 
 ### 2.3 Broadcast/Shuffle Join
 
-By default, the system implements Join by conditionally filtering small tables, broadcasting them to the nodes where the large tables are located, forming a memory Hash table, and then streaming out the data of the large tables Hash Join. However, if the amount of data filtered by small tables cannot be put into memory, Join will not be able to complete at this time. The usual error should be caused by memory overrun first.
+The system implements Join operator in two ways:
 
-If you encounter the above situation, it is recommended to use Shuffle Join explicitly, also known as Partitioned Join. That is, small and large tables are Hash according to Join's key, and then distributed Join. This memory consumption is allocated to all computing nodes in the cluster.
+Broadcast join: conditionally filtering right hand tables, broadcasting them to the nodes where the large tables are located, forming a memory Hash table, and then streaming out the data of the large tables Hash Join.
 
-Doris will try to use Broadcast Join first. If small tables are too large to broadcasting, Doris will switch to Shuffle Join automatically. Note that if you use Broadcast Join explicitly in this case, Doris will still switch to Shuffle Join automatically.
+Shuffle join: tables in both side are Hash according to Join's key, and then distributed Join. This memory consumption is allocated to all computing nodes in the cluster.
 
-Use Broadcast Join (default):
+Broadcast join is perfermance better when right hand table size is really small, vice versa.
+
+Doris will try to use Broadcast Join first. You can specify how each join operator is implemented explicitly. System provides configurable parameter `auto_broadcast_join_threshold` to configure the maximum percentage of execute memory could used for build hash table for broadcast join. The meaningful values range from 0 to 1, and the default value is 0.8. System will use shuffle join when broadcast join used memory more than it.
+
+You can turn off broadcast join by set `auto_broadcast_join_threshold` to negative or zero.
+
+Choose join implementation automaticaly (default):
 
 ```
 mysql> select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
diff --git a/docs/zh-CN/getting-started/advance-usage.md b/docs/zh-CN/getting-started/advance-usage.md
index c32ab0ed14..8be1eb5ee2 100644
--- a/docs/zh-CN/getting-started/advance-usage.md
+++ b/docs/zh-CN/getting-started/advance-usage.md
@@ -207,13 +207,19 @@ mysql> SHOW VARIABLES LIKE "%query_timeout%";
 
 ### 2.3 Broadcast/Shuffle Join
 
-系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。
+系统提供了两种Join的实现方式,broadcast join和shuffle join(partitioned Join)。
 
-如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。
+broadcast join是指将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。
 
-Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。
+shuffle join是指将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。
 
-使用 Broadcast Join(默认):
+当小表的数据量较小时,broadcast join拥有更好的性能。反之,则shuffle join拥有更好的性能。
+
+系统会自动尝试进行 Broadcast Join,也可以显式指定每个join算子的实现方式。系统提供了可配置的参数`auto_broadcast_join_threshold`,指定使用broadcast join时,hash table使用的内存占整体执行内存比例的上限,取值范围为0到1,默认值为0.8。当系统计算hash table使用的内存会超过此限制时,会自动转换为使用shuffle join。
+
+当`auto_broadcast_join_threshold`被设置为小于等于0时,所有的join都将使用shuffle join。
+
+自动选择join方式(默认):
 
 ```
 mysql> select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 0e3ed6d39b..e46831100d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -178,6 +178,10 @@ public class Analyzer {
     public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; }
     public void clearAssignedRuntimeFilters() { assignedRuntimeFilters.clear(); }
 
+    public long getAutoBroadcastJoinThreshold() {
+        return globalState.autoBroadcastJoinThreshold;
+    }
+
     // state shared between all objects of an Analyzer tree
     // TODO: Many maps here contain properties about tuples, e.g., whether
     // a tuple is outer/semi joined, etc. Remove the maps in favor of making
@@ -291,6 +295,8 @@ public class Analyzer {
 
         private final ExprRewriter mvExprRewriter;
 
+        private final long autoBroadcastJoinThreshold;
+
         public GlobalState(Catalog catalog, ConnectContext context) {
             this.catalog = catalog;
             this.context = context;
@@ -323,8 +329,19 @@ public class Analyzer {
             mvRewriteRules.add(HLLHashToSlotRefRule.INSTANCE);
             mvRewriteRules.add(CountFieldToSum.INSTANCE);
             mvExprRewriter = new ExprRewriter(mvRewriteRules);
+
+            // compute max exec mem could be used for broadcast join
+            long perNodeMemLimit = context.getSessionVariable().getMaxExecMemByte();
+            double autoBroadcastJoinThresholdPercentage = context.getSessionVariable().autoBroadcastJoinThreshold;
+            if (autoBroadcastJoinThresholdPercentage > 1) {
+                autoBroadcastJoinThresholdPercentage = 1.0;
+            } else if (autoBroadcastJoinThresholdPercentage <= 0) {
+                autoBroadcastJoinThresholdPercentage = -1.0;
+            }
+            autoBroadcastJoinThreshold = (long)(perNodeMemLimit * autoBroadcastJoinThresholdPercentage);
         }
-    };
+    }
+
     private final GlobalState globalState;
 
     // An analyzer stores analysis state for a single select block. A select block can be
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 90dda46638..5d7d30aec2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -96,12 +96,10 @@ public class DistributedPlanner {
             Preconditions.checkState(!queryStmt.hasOffset());
             isPartitioned = true;
         }
-        long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
         if (LOG.isDebugEnabled()) {
             LOG.debug("create plan fragments");
-            LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
         }
-        createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
+        createPlanFragments(singleNodePlan, isPartitioned, fragments);
         return fragments;
     }
 
@@ -181,8 +179,7 @@ public class DistributedPlanner {
      * partitioned; the partition function is derived from the inputs.
      */
     private PlanFragment createPlanFragments(
-            PlanNode root, boolean isPartitioned,
-            long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException {
+            PlanNode root, boolean isPartitioned, ArrayList<PlanFragment> fragments) throws UserException {
         ArrayList<PlanFragment> childFragments = Lists.newArrayList();
         for (PlanNode child : root.getChildren()) {
             // allow child fragments to be partitioned, unless they contain a limit clause
@@ -193,7 +190,7 @@ public class DistributedPlanner {
             // TODO()
             // if (root instanceof SubplanNode && child == root.getChild(1)) continue;
             childFragments.add(
-                    createPlanFragments(child, childIsPartitioned, perNodeMemLimit, fragments));
+                    createPlanFragments(child, childIsPartitioned, fragments));
         }
 
         PlanFragment result = null;
@@ -204,8 +201,8 @@ public class DistributedPlanner {
             result = createTableFunctionFragment(root, childFragments.get(0));
         } else if (root instanceof HashJoinNode) {
             Preconditions.checkState(childFragments.size() == 2);
-            result = createHashJoinFragment((HashJoinNode) root, childFragments.get(1),
-                    childFragments.get(0), perNodeMemLimit, fragments);
+            result = createHashJoinFragment((HashJoinNode) root,
+                    childFragments.get(1), childFragments.get(0), fragments);
         } else if (root instanceof CrossJoinNode) {
             result = createCrossJoinFragment((CrossJoinNode) root, childFragments.get(1),
                     childFragments.get(0));
@@ -306,9 +303,9 @@ public class DistributedPlanner {
      * This function is mainly used to choose the most suitable distributed method for the 'node',
      * and transform it into PlanFragment.
      */
-    private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment rightChildFragment,
-                                                PlanFragment leftChildFragment, long perNodeMemLimit,
-                                                ArrayList<PlanFragment> fragments)
+    private PlanFragment createHashJoinFragment(
+            HashJoinNode node, PlanFragment rightChildFragment,
+            PlanFragment leftChildFragment, ArrayList<PlanFragment> fragments)
             throws UserException {
         List<String> reason = Lists.newArrayList();
         if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) {
@@ -352,16 +349,16 @@ public class DistributedPlanner {
         // - or if it's cheaper and we weren't explicitly told to do a partitioned join
         // - and we're not doing a full or right outer join (those require the left-hand
         //   side to be partitioned for correctness)
-        // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit
+        // - and the expected size of the hash tbl doesn't exceed autoBroadcastThreshold
         // we set partition join as default when broadcast join cost equals partition join cost
+
         if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
             if (node.getInnerRef().isBroadcastJoin()) {
                 // respect user join hint
                 doBroadcast = true;
-            } else if (!node.getInnerRef().isPartitionJoin()
-                    && joinCostEvaluation.isBroadcastCostSmaller()
-                    && (perNodeMemLimit == 0
-                    || joinCostEvaluation.constructHashTableSpace() <= perNodeMemLimit)) {
+            } else if (!node.getInnerRef().isPartitionJoin() && joinCostEvaluation.isBroadcastCostSmaller()
+                    && joinCostEvaluation.constructHashTableSpace()
+                    <= ctx_.getRootAnalyzer().getAutoBroadcastJoinThreshold()) {
                 doBroadcast = true;
             } else {
                 doBroadcast = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 13ccc90407..6310a7aaf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -89,6 +89,9 @@ import java.util.stream.Collectors;
 public class OlapScanNode extends ScanNode {
     private static final Logger LOG = LogManager.getLogger(OlapScanNode.class);
 
+    // average compression ratio in doris storage engine
+    private final static int COMPRESSION_RATIO = 5;
+
     private List<TScanRangeLocations> result = new ArrayList<>();
     /*
      * When the field value is ON, the storage engine can return the data directly without pre-aggregation.
@@ -376,7 +379,7 @@ public class OlapScanNode extends ScanNode {
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
         if (cardinality > 0) {
-            avgRowSize = totalBytes / (float) cardinality;
+            avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO;
             capCardinalityAtLimit();
         }
         // when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e7725eccbb..20b1187e84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -176,6 +176,8 @@ public class SessionVariable implements Serializable, Writable {
 
     public static final String BLOCK_ENCRYPTION_MODE = "block_encryption_mode";
 
+    public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold";
+
     public static final String ENABLE_PROJECTION = "enable_projection";
 
     // session origin value
@@ -431,6 +433,12 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = BLOCK_ENCRYPTION_MODE)
     private String blockEncryptionMode = "";
 
+    // the maximum size in bytes for a table that will be broadcast to all be nodes
+    // when performing a join, By setting this value to -1 broadcasting can be disabled.
+    // Default value is 1Gto
+    @VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
+    public double autoBroadcastJoinThreshold = 0.8;
+  
     @VariableMgr.VarAttr(name = ENABLE_PROJECTION)
     private boolean enableProjection = false;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
index 0d304e992c..94b68b295d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
@@ -147,4 +147,28 @@ public class DistributedPlannerTest {
         plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
         Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (PARTITIONED)"));
     }
+
+    @Test
+    public void testBroadcastJoinCostThreshold() throws Exception {
+        String sql = "explain select * from db1.tbl1 join db1.tbl2 on tbl1.k1 = tbl2.k3";
+        StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql);
+        stmtExecutor.execute();
+        Planner planner = stmtExecutor.planner();
+        List<PlanFragment> fragments = planner.getFragments();
+        String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (BROADCAST)"));
+
+        double originThreshold = ctx.getSessionVariable().autoBroadcastJoinThreshold;
+        try {
+            ctx.getSessionVariable().autoBroadcastJoinThreshold = -1.0;
+            stmtExecutor = new StmtExecutor(ctx, sql);
+            stmtExecutor.execute();
+            planner = stmtExecutor.planner();
+            fragments = planner.getFragments();
+            plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+            Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (PARTITIONED)"));
+        } finally {
+            ctx.getSessionVariable().autoBroadcastJoinThreshold = originThreshold;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org