You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/09/27 06:49:31 UTC

[GitHub] [incubator-doris] HappenLee opened a new pull request #4677: Support Bucket Shuffle Join

HappenLee opened a new pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677


   issue:#4394
   
   ## Proposed changes
   
   Describe the big picture of your changes here to communicate to the maintainers why we should accept this pull request. If it fixes a bug or resolves a feature request, be sure to link to that issue.
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [] Documentation Update (if none of the other choices apply)
   - [] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have create an issue on (Fix #ISSUE), and have described the bug/feature there in detail
   - [x] Compiling and unit tests pass locally with my changes
   - [x] I have added tests that prove my fix is effective or that my feature works
   - [x] If this change need a document change, I have updated the document
   - [x] Any dependent changes have been merged
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495541353



##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -390,6 +390,28 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ
             node.setColocate(false, reason.get(0));
         }
 
+        // bucket shuffle join is better than boradcast and shuffle join

Review comment:
       ```suggestion
           // bucket shuffle join is better than broadcast and shuffle join
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be colocate join

Review comment:
       ```suggestion
           // If user have a join hint to use proper way of join, can not be bucket join
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -1399,6 +1429,175 @@ public boolean isDone() {
 
     }
 
+    class BucketShuffleJoinController {

Review comment:
       Add comment for this class

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be colocate join
+        if (node.getInnerRef().hasJoinHints()) {
+            return false;
+        }
+
+        PlanNode leftRoot = leftChildFragment.getPlanRoot();
+        //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode

Review comment:
       Comment is wrong

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be colocate join

Review comment:
       And I think if user specify [SHUFFLE] hint, we should try to do bucket shuffle too.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be colocate join
+        if (node.getInnerRef().hasJoinHints()) {
+            return false;
+        }
+
+        PlanNode leftRoot = leftChildFragment.getPlanRoot();
+        //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode
+        if (leftRoot instanceof OlapScanNode) {
+            return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+        }
+
+        return false;
+    }
+
+    //the join expr must contian left table distribute column
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
+                                    List<Expr> rhsJoinExprs) {
+        OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+
+        //1 the left table must be only one partition
+        if (leftScanNode.getSelectedPartitionIds().size() > 1) {
+            return false;
+        }
+
+        DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
+
+        if (leftDistribution instanceof HashDistributionInfo ) {
+            List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
+
+            List<Column> leftJoinColumns = new ArrayList<>();
+            List<Expr> rightExprs = new ArrayList<>();
+            List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+            for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+                Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+                Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+                if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) {
+                    continue;
+                }
+
+                SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
+
+                leftJoinColumns.add(leftSlot.getColumn());
+                rightExprs.add(rhsJoinExpr);
+            }
+
+            //2 the join columns should contains all left table distribute columns to enable bucket shuffle join
+            for (Column distributeColumn : leftDistributeColumns) {
+                int loc = leftJoinColumns.indexOf(distributeColumn);
+                // TODO: now support bucket shuffle join when distribute column type different with
+                // right expr type
+                if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
+                    return false;
+                }
+                rhsJoinExprs.add(rightExprs.get(loc));
+            }
+        }

Review comment:
       ```suggestion
           } else {
               return false;
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495684304



##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -783,13 +784,39 @@ private void computeFragmentExecParams() throws Exception {
                         params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
             }
 
-            // add destination host to this fragment's destination
-            for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
-                TPlanFragmentDestination dest = new TPlanFragmentDestination();
-                dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
-                dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
-                dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
-                params.destinations.add(dest);
+            if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495566516



##########
File path: be/src/runtime/data_stream_sender.cpp
##########
@@ -479,7 +495,30 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
                 hash_val = RawValue::get_hash_value_fvn(
                     partition_val, ctx->root()->type(), hash_val);
             }
-            RETURN_IF_ERROR(_channels[hash_val % num_channels]->add_row(row));
+            auto target_channel_id = hash_val % num_channels;
+            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(row));
+        }
+    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // hash-partition batch's rows across channels
+        int num_channels = _channel_shared_ptrs.size();
+
+        for (int i = 0; i < batch->num_rows(); ++i) {
+            TupleRow* row = batch->get_row(i);
+            size_t hash_val = 0;
+
+            for (auto ctx : _partition_expr_ctxs) {
+                void* partition_val = ctx->get_value(row);
+                // We can't use the crc hash function here because it does not result
+                // in uncorrelated hashes with different seeds.  Instead we must use
+                // fvn hash.
+                // TODO: fix crc hash/GetHashValue()
+                //hash_val = RawValue::get_hash_value_fvn(
+                //    partition_val, ctx->root()->type(), hash_val);
+                hash_val = RawValue::zlib_crc32(

Review comment:
       This comment is not right, I will change this part.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495566448



##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -783,13 +784,39 @@ private void computeFragmentExecParams() throws Exception {
                         params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
             }
 
-            // add destination host to this fragment's destination
-            for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
-                TPlanFragmentDestination dest = new TPlanFragmentDestination();
-                dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
-                dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
-                dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
-                params.destinations.add(dest);
+            if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {

Review comment:
       Yes, It is the future work to do. But this part is complex and has a great impact. I think we need to this work gradually




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495564022



##########
File path: be/src/runtime/data_stream_sender.cpp
##########
@@ -479,7 +495,30 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
                 hash_val = RawValue::get_hash_value_fvn(
                     partition_val, ctx->root()->type(), hash_val);
             }
-            RETURN_IF_ERROR(_channels[hash_val % num_channels]->add_row(row));
+            auto target_channel_id = hash_val % num_channels;
+            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(row));
+        }
+    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // hash-partition batch's rows across channels
+        int num_channels = _channel_shared_ptrs.size();
+
+        for (int i = 0; i < batch->num_rows(); ++i) {
+            TupleRow* row = batch->get_row(i);
+            size_t hash_val = 0;
+
+            for (auto ctx : _partition_expr_ctxs) {
+                void* partition_val = ctx->get_value(row);
+                // We can't use the crc hash function here because it does not result
+                // in uncorrelated hashes with different seeds.  Instead we must use
+                // fvn hash.
+                // TODO: fix crc hash/GetHashValue()
+                //hash_val = RawValue::get_hash_value_fvn(
+                //    partition_val, ctx->root()->type(), hash_val);
+                hash_val = RawValue::zlib_crc32(

Review comment:
       Why comment is fvn hash but code is crc32 hash?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be colocate join

Review comment:
       ```suggestion
           // If user have a join hint to use proper way of join, can not be bucket shuffle join
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -783,13 +784,39 @@ private void computeFragmentExecParams() throws Exception {
                         params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
             }
 
-            // add destination host to this fragment's destination
-            for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
-                TPlanFragmentDestination dest = new TPlanFragmentDestination();
-                dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
-                dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
-                dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
-                params.destinations.add(dest);
+            if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {

Review comment:
       If you could abstract the query schedule strategy like `Presto` and refactor this class, which would be very great.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495566098



##########
File path: be/src/runtime/data_stream_sender.cpp
##########
@@ -479,7 +495,30 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
                 hash_val = RawValue::get_hash_value_fvn(
                     partition_val, ctx->root()->type(), hash_val);
             }
-            RETURN_IF_ERROR(_channels[hash_val % num_channels]->add_row(row));
+            auto target_channel_id = hash_val % num_channels;
+            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(row));
+        }
+    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // hash-partition batch's rows across channels
+        int num_channels = _channel_shared_ptrs.size();
+
+        for (int i = 0; i < batch->num_rows(); ++i) {
+            TupleRow* row = batch->get_row(i);
+            size_t hash_val = 0;
+
+            for (auto ctx : _partition_expr_ctxs) {
+                void* partition_val = ctx->get_value(row);
+                // We can't use the crc hash function here because it does not result
+                // in uncorrelated hashes with different seeds.  Instead we must use
+                // fvn hash.
+                // TODO: fix crc hash/GetHashValue()
+                //hash_val = RawValue::get_hash_value_fvn(
+                //    partition_val, ctx->root()->type(), hash_val);
+                hash_val = RawValue::zlib_crc32(

Review comment:
       This comment is not right, It should be deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #4677: Support Bucket Shuffle Join

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org