You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/08 07:44:55 UTC

[incubator-doris] branch master updated: [Bug] Fix the bug of bucket shuffle join cause error plan (#6172)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cca813a  [Bug] Fix the bug of bucket shuffle join cause error plan (#6172)
cca813a is described below

commit cca813a57b7466bd12af48232815148eebe477f8
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Jul 8 02:44:39 2021 -0500

    [Bug] Fix the bug of bucket shuffle join cause error plan (#6172)
---
 .../apache/doris/planner/DistributedPlanner.java   | 52 +++++++++++++---------
 .../org/apache/doris/planner/QueryPlanTest.java    | 15 +++++++
 2 files changed, 46 insertions(+), 21 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index f03a5e2..677bde5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -23,7 +23,6 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.QueryStmt;
-import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.ColocateTableIndex;
@@ -49,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import avro.shaded.com.google.common.collect.Maps;
 
@@ -329,7 +329,7 @@ public class DistributedPlanner {
         // bucket shuffle join is better than broadcast and shuffle join
         // it can reduce the network cost of join, so doris chose it first
         List<Expr> rhsPartitionxprs = Lists.newArrayList();
-        if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) {
+        if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
             node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
             DataPartition rhsJoinPartition =
                     new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
@@ -633,7 +633,7 @@ public class DistributedPlanner {
         return false;
     }
 
-    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment,
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
                                          List<Expr> rhsHashExprs) {
         if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
             return false;
@@ -649,14 +649,10 @@ public class DistributedPlanner {
             return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
         }
 
-        // 2.leftRoot be hashjoin node and not shuffle join
+        // 2.leftRoot be hashjoin node
         if (leftRoot instanceof HashJoinNode) {
             while (leftRoot instanceof HashJoinNode) {
-                if (!((HashJoinNode)leftRoot).isShuffleJoin()) {
-                    leftRoot = leftRoot.getChild(0);
-                } else {
-                    return false;
-                }
+                leftRoot = leftRoot.getChild(0);
             }
             if (leftRoot instanceof OlapScanNode) {
                 return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
@@ -683,9 +679,12 @@ public class DistributedPlanner {
         DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
 
         if (leftDistribution instanceof HashDistributionInfo) {
+            // use the table_name + '-' + column_name as check condition
             List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
+            List<String> leftDistributeColumnNames = leftDistributeColumns.stream().
+                    map(col -> leftTable.getName() + "." + col.getName()).collect(Collectors.toList());
 
-            List<Column> leftJoinColumns = new ArrayList<>();
+            List<String> leftJoinColumnNames = new ArrayList<>();
             List<Expr> rightExprs = new ArrayList<>();
             List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
 
@@ -696,21 +695,32 @@ public class DistributedPlanner {
                     continue;
                 }
 
-                SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
-
-                leftJoinColumns.add(leftSlot.getColumn());
-                rightExprs.add(rhsJoinExpr);
+                SlotRef leftSlot = lhsJoinExpr.unwrapSlotRef();
+                if (leftSlot.getTable() instanceof OlapTable) {
+                    // table name in SlotRef is not the really name. `select * from test as t`
+                    // table name in SlotRef is `t`, but here we need is `test`.
+                    leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + leftSlot.getColumnName());
+                    rightExprs.add(rhsJoinExpr);
+                }
             }
 
             //2 the join columns should contains all left table distribute columns to enable bucket shuffle join
-            for (Column distributeColumn : leftDistributeColumns) {
-                int loc = leftJoinColumns.indexOf(distributeColumn);
-                // TODO: now support bucket shuffle join when distribute column type different with
-                // right expr type
-                if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
-                    return false;
+            for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
+                String distributeColumnName = leftDistributeColumnNames.get(i);
+                boolean findRhsExprs = false;
+                // check the join column name is same as distribute column name and
+                // check the rhs join expr type is same as distribute column
+                for (int j = 0; j < leftJoinColumnNames.size(); j++) {
+                    if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
+                        if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
+                            rhsJoinExprs.add(rightExprs.get(j));
+                            findRhsExprs = true;
+                            break;
+                        }
+                    }
                 }
-                rhsJoinExprs.add(rightExprs.get(loc));
+
+                if (!findRhsExprs) return false;
             }
         } else {
             return false;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 53fede1..4aac012 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1121,6 +1121,21 @@ public class QueryPlanTest {
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
         Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));
+
+        // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name
+        queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
+                "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
+
+        // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join
+        queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
+                "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and t4.k2 = t3.k2";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+        Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
+
         // disable bucket shuffle join again
         Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org