You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/03/04 08:38:07 UTC

[GitHub] [incubator-doris] HappenLee opened a new pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

HappenLee opened a new pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459


   ## Proposed changes
   
   1.Support Bucket Shuffle Join when left table is colocate table or Colocate/Bucket Bucket Shuffle Join
   2.Enable Local Rumtime Filter when there is Bucket Shuffle Join and Colocate Join
   3.Add Doc for Bucket Shuffle Join
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [] Documentation Update (if none of the other choices apply)
   - [] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have created an issue on (Fix #5438) and described the bug/feature there in detail
   - [x] Compiling and unit tests pass locally with my changes
   - [x] I have added tests that prove my fix is effective or that my feature works
   - [x] If these changes need document changes, I have updated the document
   - [x] Any dependent changes have been merged
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 merged pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 merged pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587974095



##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -529,21 +533,40 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
         }
 
         PlanNode leftRoot = leftChildFragment.getPlanRoot();
-        // leftRoot should be OlapScanNode
+        // 1.leftRoot be OlapScanNode
         if (leftRoot instanceof OlapScanNode) {
             return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
         }
 
+        // 2.leftRoot be colocate hashjoin node or bucket shuffle join node
+        PlanNode rightRoot = rightChildFragment.getPlanRoot();
+        if (leftRoot instanceof HashJoinNode) {
+            while (leftRoot instanceof HashJoinNode) {
+                if (((HashJoinNode)leftRoot).isColocate() || ((HashJoinNode)leftRoot).isBucketShuffle()) {
+                    leftRoot = leftRoot.getChild(0);
+                } else {
+                    return false;
+                }
+            }
+            if (leftRoot instanceof OlapScanNode) {
+                return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+            }
+        }
+
         return false;
     }
 
     //the join expr must contian left table distribute column
     private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
                                     List<Expr> rhsJoinExprs) {
         OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+        OlapTable leftTable = leftScanNode.getOlapTable();
 
-        //1 the left table must be only one partition
+        //1 the left table has more than one partition or left table is not a stable colocate table
         if (leftScanNode.getSelectedPartitionIds().size() != 1) {

Review comment:
       Why is it related to the number of partitions here?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -1240,11 +1244,16 @@ private void computeScanRangeAssignment() throws Exception {
             scanNodeIds.add(scanNode.getId().asInt());
 
             FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
-            if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
+            boolean fragmentContainsColocateJoin = isColocateJoin(scanNode.getFragment().getPlanRoot());
+            boolean fragmentContainsBucketShuffleJoin = bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot());
+
+            if (fragmentContainsColocateJoin) {
                 computeScanRangeAssignmentByColocate((OlapScanNode) scanNode);
-            } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
+            }
+            if (fragmentContainsBucketShuffleJoin) {

Review comment:
       ```suggestion
              else if (fragmentContainsBucketShuffleJoin) {
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -529,21 +533,40 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
         }
 
         PlanNode leftRoot = leftChildFragment.getPlanRoot();
-        // leftRoot should be OlapScanNode
+        // 1.leftRoot be OlapScanNode
         if (leftRoot instanceof OlapScanNode) {
             return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
         }
 
+        // 2.leftRoot be colocate hashjoin node or bucket shuffle join node
+        PlanNode rightRoot = rightChildFragment.getPlanRoot();
+        if (leftRoot instanceof HashJoinNode) {
+            while (leftRoot instanceof HashJoinNode) {
+                if (((HashJoinNode)leftRoot).isColocate() || ((HashJoinNode)leftRoot).isBucketShuffle()) {

Review comment:
       how about broadcast?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -1180,32 +1180,36 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par
                 }
             }
 
+            // set bucket for scanrange

Review comment:
       It is recommended to enrich the comments and give a practical example to illustrate what is stored in this nested type.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -1240,11 +1244,16 @@ private void computeScanRangeAssignment() throws Exception {
             scanNodeIds.add(scanNode.getId().asInt());
 
             FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
-            if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
+            boolean fragmentContainsColocateJoin = isColocateJoin(scanNode.getFragment().getPlanRoot());
+            boolean fragmentContainsBucketShuffleJoin = bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot());
+
+            if (fragmentContainsColocateJoin) {
                 computeScanRangeAssignmentByColocate((OlapScanNode) scanNode);
-            } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
+            }
+            if (fragmentContainsBucketShuffleJoin) {
                 bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, idToBackend, addressToBackendID);
-            } else {
+            }
+            if (!(fragmentContainsColocateJoin | fragmentContainsBucketShuffleJoin)) {

Review comment:
       ```suggestion
               else {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587998229



##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -375,6 +375,15 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ
             doBroadcast = false;
         }
 
+        // Push down the predicates constructed by the right child when the

Review comment:
       ok,I will rewrite this code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587998800



##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -529,21 +533,40 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
         }
 
         PlanNode leftRoot = leftChildFragment.getPlanRoot();
-        // leftRoot should be OlapScanNode
+        // 1.leftRoot be OlapScanNode
         if (leftRoot instanceof OlapScanNode) {
             return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
         }
 
+        // 2.leftRoot be colocate hashjoin node or bucket shuffle join node
+        PlanNode rightRoot = rightChildFragment.getPlanRoot();
+        if (leftRoot instanceof HashJoinNode) {
+            while (leftRoot instanceof HashJoinNode) {
+                if (((HashJoinNode)leftRoot).isColocate() || ((HashJoinNode)leftRoot).isBucketShuffle()) {
+                    leftRoot = leftRoot.getChild(0);
+                } else {
+                    return false;
+                }
+            }
+            if (leftRoot instanceof OlapScanNode) {
+                return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+            }
+        }
+
         return false;
     }
 
     //the join expr must contian left table distribute column
     private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
                                     List<Expr> rhsJoinExprs) {
         OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+        OlapTable leftTable = leftScanNode.getOlapTable();
 
-        //1 the left table must be only one partition
+        //1 the left table has more than one partition or left table is not a stable colocate table
         if (leftScanNode.getSelectedPartitionIds().size() != 1) {

Review comment:
       For partitioned tables, because the data distribution rules of each partition may be different, the Bucket Shuffle Join can only guarantee that the left table is a single partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587448251



##########
File path: docs/zh-CN/administrator-guide/bucket-shuffle-join.md
##########
@@ -0,0 +1,104 @@
+{
+    "title": "Bucket Shuffle Join",
+    "language": "zh-CN"
+}
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Bucket Shuffle Join
+
+Bucket Shuffle Join 是在 Doris 0.14 版本中正式加入的新功能。旨在为某些 Join 查询提供本地性优化,来减少数据在节点间的传输耗时,来加速查询。
+
+它的设计、实现和效果可以参阅 [ISSUE 4394](https://github.com/apache/incubator-doris/issues/4394)。
+
+## 名词解释
+
+* FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。
+* BE:Backend,Doris 的后端节点。负责查询执行和数据存储。
+* 左表:Join查询时,左边的表。进行Probe操作。可被Join Reorder调整顺序。
+* 右表:Join查询时,右边的表。进行Build操作。可被Join Reorder调整顺序。
+
+## 原理
+除了Bucket Shuffle Join, Doris 支持3种类型的join: `Shuffle Join, Broadcast Join, Colocate Join`。除了`Colocate Join`, 其他类型的join都会导致较大的网络和内存的开销。

Review comment:
       这句要不这么写?常规的分布式Join方式包括,shuffle join 和broadcast join。这两种join都会导致不小的网络开销。




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] qidaye commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
qidaye commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587297331



##########
File path: docs/en/administrator-guide/bucket-shuffle-join.md
##########
@@ -0,0 +1,106 @@
+```
+---
+{
+    "title": "Bucket Shuffle Join",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+```
+# Bucket Shuffle Join
+
+Bucket Shuffle Join is a new function officially added in Doris 0.14. The purpose is to provide local optimization for some join queries to reduce the time-consuming of data transmission between nodes and speed up the query.
+
+It's design, implementation can be referred to [ISSUE 4394](https://github.com/apache/incubator-doris/issues/4394)。
+
+## Noun Interpretation
+
+* FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
+* BE: Backend, Doris's back-end node. Responsible for query execution and data storage.
+* Left table: the left table in join query. Perform probe expr. The order can be adjusted by join reorder.
+* Right table: the right table in join query. Perform build expr The order can be adjusted by join reorder.
+
+## Principle
+In addition to bucket shuffle join, Doris supports three types of join: `Shuffle Join, Broadcast Join, Colocate Join`.  Except `colorate join`, other types of join will lead to some network overhead.
+
+For example, there are join queries for table A and table B. the join method is hashjoin. The cost of different join types is as follows:
+* **Broadcast Join**: If table a has three executing hashjoinnodes according to the data distribution, table B needs to be sent to the three HashJoinNode. Its network overhead is `3B `, and its memory overhead is `3B`. 
+* **Shuffle Join**: Shuffle join will distribute the data of tables A and B to the nodes of the cluster according to hash calculation, so its network overhead is `A + B` and memory overhead is `B`.
+
+The data distribution information of each Doris table is saved in FE. If the join statement hits the data distribution column of the left table, we should use the data distribution information to reduce the network and memory overhead of the join query. This is the source of the idea of bucket shuffle join.
+
+![image.png](https://upload-images.jianshu.io/upload_images/8552201-c383fe84aeee13bc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

Review comment:
       This image should be included in the Doris repository and should not rely on a third-party link.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587303102



##########
File path: docs/en/administrator-guide/bucket-shuffle-join.md
##########
@@ -0,0 +1,106 @@
+```
+---
+{
+    "title": "Bucket Shuffle Join",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+```
+# Bucket Shuffle Join
+
+Bucket Shuffle Join is a new function officially added in Doris 0.14. The purpose is to provide local optimization for some join queries to reduce the time-consuming of data transmission between nodes and speed up the query.
+
+It's design, implementation can be referred to [ISSUE 4394](https://github.com/apache/incubator-doris/issues/4394)。
+
+## Noun Interpretation
+
+* FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
+* BE: Backend, Doris's back-end node. Responsible for query execution and data storage.
+* Left table: the left table in join query. Perform probe expr. The order can be adjusted by join reorder.
+* Right table: the right table in join query. Perform build expr The order can be adjusted by join reorder.
+
+## Principle
+In addition to bucket shuffle join, Doris supports three types of join: `Shuffle Join, Broadcast Join, Colocate Join`.  Except `colorate join`, other types of join will lead to some network overhead.
+
+For example, there are join queries for table A and table B. the join method is hashjoin. The cost of different join types is as follows:
+* **Broadcast Join**: If table a has three executing hashjoinnodes according to the data distribution, table B needs to be sent to the three HashJoinNode. Its network overhead is `3B `, and its memory overhead is `3B`. 
+* **Shuffle Join**: Shuffle join will distribute the data of tables A and B to the nodes of the cluster according to hash calculation, so its network overhead is `A + B` and memory overhead is `B`.
+
+The data distribution information of each Doris table is saved in FE. If the join statement hits the data distribution column of the left table, we should use the data distribution information to reduce the network and memory overhead of the join query. This is the source of the idea of bucket shuffle join.
+
+![image.png](https://upload-images.jianshu.io/upload_images/8552201-c383fe84aeee13bc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

Review comment:
       yes,I will change the image link.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #5459: [Bucket Shuffle Join] Support the some featrue of Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #5459:
URL: https://github.com/apache/incubator-doris/pull/5459#discussion_r587473989



##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -375,6 +375,15 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ
             doBroadcast = false;
         }
 
+        // Push down the predicates constructed by the right child when the

Review comment:
       It feels weird to set a value here uniformly, and then change it later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org