You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/08 07:44:55 UTC
[incubator-doris] branch master updated: [Bug] Fix the bug of
bucket shuffle join cause error plan (#6172)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cca813a [Bug] Fix the bug of bucket shuffle join cause error plan (#6172)
cca813a is described below
commit cca813a57b7466bd12af48232815148eebe477f8
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Jul 8 02:44:39 2021 -0500
[Bug] Fix the bug of bucket shuffle join cause error plan (#6172)
---
.../apache/doris/planner/DistributedPlanner.java | 52 +++++++++++++---------
.../org/apache/doris/planner/QueryPlanTest.java | 15 +++++++
2 files changed, 46 insertions(+), 21 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index f03a5e2..677bde5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -23,7 +23,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.QueryStmt;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ColocateTableIndex;
@@ -49,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import avro.shaded.com.google.common.collect.Maps;
@@ -329,7 +329,7 @@ public class DistributedPlanner {
// bucket shuffle join is better than broadcast and shuffle join
// it can reduce the network cost of join, so doris chose it first
List<Expr> rhsPartitionxprs = Lists.newArrayList();
- if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) {
+ if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
DataPartition rhsJoinPartition =
new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
@@ -633,7 +633,7 @@ public class DistributedPlanner {
return false;
}
- private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment,
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
List<Expr> rhsHashExprs) {
if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
return false;
@@ -649,14 +649,10 @@ public class DistributedPlanner {
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
}
- // 2.leftRoot be hashjoin node and not shuffle join
+ // 2.leftRoot be hashjoin node
if (leftRoot instanceof HashJoinNode) {
while (leftRoot instanceof HashJoinNode) {
- if (!((HashJoinNode)leftRoot).isShuffleJoin()) {
- leftRoot = leftRoot.getChild(0);
- } else {
- return false;
- }
+ leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
@@ -683,9 +679,12 @@ public class DistributedPlanner {
DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
if (leftDistribution instanceof HashDistributionInfo) {
+ // use the table_name + '-' + column_name as check condition
List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
+ List<String> leftDistributeColumnNames = leftDistributeColumns.stream().
+ map(col -> leftTable.getName() + "." + col.getName()).collect(Collectors.toList());
- List<Column> leftJoinColumns = new ArrayList<>();
+ List<String> leftJoinColumnNames = new ArrayList<>();
List<Expr> rightExprs = new ArrayList<>();
List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
@@ -696,21 +695,32 @@ public class DistributedPlanner {
continue;
}
- SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
-
- leftJoinColumns.add(leftSlot.getColumn());
- rightExprs.add(rhsJoinExpr);
+ SlotRef leftSlot = lhsJoinExpr.unwrapSlotRef();
+ if (leftSlot.getTable() instanceof OlapTable) {
+ // table name in SlotRef is not the really name. `select * from test as t`
+ // table name in SlotRef is `t`, but here we need is `test`.
+ leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + leftSlot.getColumnName());
+ rightExprs.add(rhsJoinExpr);
+ }
}
//2 the join columns should contains all left table distribute columns to enable bucket shuffle join
- for (Column distributeColumn : leftDistributeColumns) {
- int loc = leftJoinColumns.indexOf(distributeColumn);
- // TODO: now support bucket shuffle join when distribute column type different with
- // right expr type
- if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
- return false;
+ for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
+ String distributeColumnName = leftDistributeColumnNames.get(i);
+ boolean findRhsExprs = false;
+ // check the join column name is same as distribute column name and
+ // check the rhs join expr type is same as distribute column
+ for (int j = 0; j < leftJoinColumnNames.size(); j++) {
+ if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
+ if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
+ rhsJoinExprs.add(rightExprs.get(j));
+ findRhsExprs = true;
+ break;
+ }
+ }
}
- rhsJoinExprs.add(rightExprs.get(loc));
+
+ if (!findRhsExprs) return false;
}
} else {
return false;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 53fede1..4aac012 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1121,6 +1121,21 @@ public class QueryPlanTest {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));
+
+ // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name
+ queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
+ "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+ Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
+
+ // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join
+ queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
+ "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and t4.k2 = t3.k2";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+ Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
+
// disable bucket shuffle join again
Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org