You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/10/11 07:37:45 UTC

[incubator-doris] branch master updated: [SQL] Support Bucket Shuffle Join (#4677)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f26e4  [SQL] Support Bucket Shuffle Join (#4677)
04f26e4 is described below

commit 04f26e4b7f9b929c9c713fb01e513f5c80b494c0
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sun Oct 11 15:37:32 2020 +0800

    [SQL] Support Bucket Shuffle Join (#4677)
    
    Support Bucket Shuffle Join
    issue:#4394
---
 be/src/runtime/data_stream_sender.cpp              |  61 +++--
 .../org/apache/doris/planner/DataPartition.java    |   3 +-
 .../apache/doris/planner/DistributedPlanner.java   |  90 ++++++++
 .../org/apache/doris/planner/HashJoinNode.java     |  10 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 227 +++++++++++++++++-
 .../java/org/apache/doris/qe/SessionVariable.java  |   8 +
 .../org/apache/doris/planner/QueryPlanTest.java    |  86 +++++++
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 257 +++++++++++++++++++++
 gensrc/thrift/Partitions.thrift                    |   6 +-
 9 files changed, 720 insertions(+), 28 deletions(-)

diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 543c1f2..b2173e8 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -132,6 +132,10 @@ public:
         return uid.to_string();
     }
 
+    TUniqueId get_fragment_instance_id() {
+        return _fragment_instance_id;
+    }
+
 private:
     inline Status _wait_last_brpc() {
         auto cntl = &_closure->cntl;
@@ -241,6 +245,9 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
 }
 
 Status DataStreamSender::Channel::add_row(TupleRow* row) {
+    if (_fragment_instance_id.lo == -1) {
+        return Status::OK();
+    }
     int row_num = _batch->add_row();
 
     if (row_num == RowBatch::INVALID_ROW_INDEX) {
@@ -329,18 +336,27 @@ DataStreamSender::DataStreamSender(
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
             || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
             || sink.output_partition.type == TPartitionType::RANDOM
-            || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED);
-    // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
+            || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED
+            || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
+    // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable
+
+    std::map<int64_t, int64_t> fragment_id_to_channel_index;
     for (int i = 0; i < destinations.size(); ++i) {
         // Select first dest as transfer chain.
         bool is_transfer_chain = (i == 0);
-        _channel_shared_ptrs.emplace_back(
-            new Channel(this, row_desc,
-                        destinations[i].brpc_server,
-                        destinations[i].fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size, 
-                        is_transfer_chain, send_query_statistics_with_every_batch));
-        _channels.push_back(_channel_shared_ptrs[i].get());
+        const auto& fragment_instance_id = destinations[i].fragment_instance_id;
+        if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) {
+            _channel_shared_ptrs.emplace_back(
+                    new Channel(this, row_desc,
+                                destinations[i].brpc_server,
+                                fragment_instance_id,
+                                sink.dest_node_id, per_channel_buffer_size,
+                                is_transfer_chain, send_query_statistics_with_every_batch));
+            fragment_id_to_channel_index.insert({fragment_instance_id.lo, _channel_shared_ptrs.size() - 1});
+            _channels.push_back(_channel_shared_ptrs.back().get());
+        } else {
+            _channel_shared_ptrs.emplace_back(_channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+        }
     }
 }
 
@@ -353,7 +369,7 @@ static bool compare_part_use_range(const PartitionInfo* v1, const PartitionInfo*
 Status DataStreamSender::init(const TDataSink& tsink) {
     RETURN_IF_ERROR(DataSink::init(tsink));
     const TDataStreamSink& t_stream_sink = tsink.stream_sink;
-    if (_part_type == TPartitionType::HASH_PARTITIONED) {
+    if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         RETURN_IF_ERROR(Expr::create_expr_trees(
                 _pool, t_stream_sink.output_partition.partition_exprs, &_partition_expr_ctxs));
     } else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
@@ -402,7 +418,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
         // Randomize the order we open/transmit to channels to avoid thundering herd problems.
         srand(reinterpret_cast<uint64_t>(this));
         random_shuffle(_channels.begin(), _channels.end());
-    } else if (_part_type == TPartitionType::HASH_PARTITIONED) {
+    } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         RETURN_IF_ERROR(Expr::prepare(_partition_expr_ctxs, state, _row_desc, _expr_mem_tracker));
     } else {
         RETURN_IF_ERROR(Expr::prepare(_partition_expr_ctxs, state, _row_desc, _expr_mem_tracker));
@@ -449,7 +465,7 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
     SCOPED_TIMER(_profile->total_time_counter());
 
     // Unpartition or _channel size
-    if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
+    if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1 ) {
         RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch, _channels.size()));
         for (auto channel : _channels) {
             RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
@@ -479,7 +495,26 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
                 hash_val = RawValue::get_hash_value_fvn(
                     partition_val, ctx->root()->type(), hash_val);
             }
-            RETURN_IF_ERROR(_channels[hash_val % num_channels]->add_row(row));
+            auto target_channel_id = hash_val % num_channels;
+            RETURN_IF_ERROR(_channels[target_channel_id]->add_row(row));
+        }
+    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // hash-partition batch's rows across channels
+        int num_channels = _channel_shared_ptrs.size();
+
+        for (int i = 0; i < batch->num_rows(); ++i) {
+            TupleRow* row = batch->get_row(i);
+            size_t hash_val = 0;
+
+            for (auto ctx : _partition_expr_ctxs) {
+                void* partition_val = ctx->get_value(row);
+                // We must use the crc hash function to make sure the hash val equal
+                // to left table data distribute hash val
+                hash_val = RawValue::zlib_crc32(
+                        partition_val, ctx->root()->type(), hash_val);
+            }
+            auto target_channel_id = hash_val % num_channels;
+            RETURN_IF_ERROR(_channel_shared_ptrs[target_channel_id]->add_row(row));
         }
     } else {
         // Range partition
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 2e9941d..1a293ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -59,7 +59,8 @@ public class DataPartition {
         Preconditions.checkNotNull(exprs);
         Preconditions.checkState(!exprs.isEmpty());
         Preconditions.checkState(
-          type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED);
+          type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED
+                  || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED);
         this.type = type;
         this.partitionExprs = ImmutableList.copyOf(exprs);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 2d3305d..efaa5c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -390,6 +390,28 @@ public class DistributedPlanner {
             node.setColocate(false, reason.get(0));
         }
 
+        // bucket shuffle join is better than broadcast and shuffle join
+        // it can reduce the network cost of join, so doris chose it first
+        List<Expr> rhsPartitionxprs = Lists.newArrayList();
+        if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
+            node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+            DataPartition rhsJoinPartition =
+                    new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
+            ExchangeNode rhsExchange =
+                    new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot(), false);
+            rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances());
+            rhsExchange.init(ctx_.getRootAnalyzer());
+
+            node.setChild(0, leftChildFragment.getPlanRoot());
+            node.setChild(1, rhsExchange);
+            leftChildFragment.setPlanRoot(node);
+
+            rightChildFragment.setDestination(rhsExchange);
+            rightChildFragment.setOutputPartition(rhsJoinPartition);
+
+            return leftChildFragment;
+        }
+
         if (doBroadcast) {
             node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
             // Doesn't create a new fragment, but modifies leftChildFragment to execute
@@ -498,6 +520,74 @@ public class DistributedPlanner {
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+                                   List<Expr> rhsHashExprs) {
+        if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+            return false;
+        }
+        // If user have a join hint to use proper way of join, can not be bucket shuffle join
+        if (node.getInnerRef().hasJoinHints()) {
+            return false;
+        }
+
+        PlanNode leftRoot = leftChildFragment.getPlanRoot();
+        // leftRoot should be OlapScanNode
+        if (leftRoot instanceof OlapScanNode) {
+            return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+        }
+
+        return false;
+    }
+
+    //the join expr must contian left table distribute column
+    private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
+                                    List<Expr> rhsJoinExprs) {
+        OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+
+        //1 the left table must be only one partition
+        if (leftScanNode.getSelectedPartitionIds().size() > 1) {
+            return false;
+        }
+
+        DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
+
+        if (leftDistribution instanceof HashDistributionInfo) {
+            List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
+
+            List<Column> leftJoinColumns = new ArrayList<>();
+            List<Expr> rightExprs = new ArrayList<>();
+            List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+            for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+                Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+                Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+                if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) {
+                    continue;
+                }
+
+                SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
+
+                leftJoinColumns.add(leftSlot.getColumn());
+                rightExprs.add(rhsJoinExpr);
+            }
+
+            //2 the join columns should contains all left table distribute columns to enable bucket shuffle join
+            for (Column distributeColumn : leftDistributeColumns) {
+                int loc = leftJoinColumns.indexOf(distributeColumn);
+                // TODO: now support bucket shuffle join when distribute column type different with
+                // right expr type
+                if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
+                    return false;
+                }
+                rhsJoinExprs.add(rightExprs.get(loc));
+            }
+        } else {
+            return false;
+        }
+
+        return true;
+    }
+
     //the table must be colocate
     //the colocate group must be stable
     //the eqJoinConjuncts must contain the distributionColumns
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index d3ead3f..1768307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -62,6 +62,7 @@ public class HashJoinNode extends PlanNode {
     private DistributionMode distrMode;
     private boolean isColocate = false; //the flag for colocate join
     private String colocateReason = ""; // if can not do colocate join, set reason here
+    private boolean isBucketShuffle = false; // the flag for bucket shuffle join
 
     public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef,
                         List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
@@ -122,6 +123,10 @@ public class HashJoinNode extends PlanNode {
         return isColocate;
     }
 
+    public boolean isBucketShuffle() {
+        return distrMode.equals(DistributionMode.BUCKET_SHUFFLE);
+    }
+
     public void setColocate(boolean colocate, String reason) {
         isColocate = colocate;
         colocateReason = reason;
@@ -299,10 +304,11 @@ public class HashJoinNode extends PlanNode {
         return distrMode == DistributionMode.PARTITIONED;
     }
 
-    enum DistributionMode {
+    public enum DistributionMode {
         NONE("NONE"),
         BROADCAST("BROADCAST"),
-        PARTITIONED("PARTITIONED");
+        PARTITIONED("PARTITIONED"),
+        BUCKET_SHUFFLE("BUCKET_SHUFFLE");
 
         private final String description;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3164903..0fc2680 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -746,6 +746,7 @@ public class Coordinator {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("fragment {} has instances {}", params.fragment.getFragmentId(), params.instanceExecParams.size());
             }
+
             for (int j = 0; j < params.instanceExecParams.size(); ++j) {
                 // we add instance_num to query_id.lo to create a
                 // globally-unique instance id
@@ -783,13 +784,39 @@ public class Coordinator {
                         params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
             }
 
-            // add destination host to this fragment's destination
-            for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
-                TPlanFragmentDestination dest = new TPlanFragmentDestination();
-                dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
-                dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
-                dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
-                params.destinations.add(dest);
+            if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {
+                int bucketSeq = 0;
+                int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
+                TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
+
+                while (bucketSeq < bucketNum) {
+                    TPlanFragmentDestination dest = new TPlanFragmentDestination();
+
+                    dest.fragment_instance_id = new TUniqueId(-1, -1);
+                    dest.server = dummyServer;
+                    dest.setBrpcServer(dummyServer);
+
+                    for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
+                        if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+                            dest.fragment_instance_id = instanceExecParams.instanceId;
+                            dest.server = toRpcHost(instanceExecParams.host);
+                            dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                            break;
+                        }
+                    }
+
+                    bucketSeq++;
+                    params.destinations.add(dest);
+                }
+            } else {
+                // add destination host to this fragment's destination
+                for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
+                    TPlanFragmentDestination dest = new TPlanFragmentDestination();
+                    dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
+                    dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
+                    dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
+                    params.destinations.add(dest);
+                }
             }
         }
     }
@@ -985,9 +1012,11 @@ public class Coordinator {
 
             int parallelExecInstanceNum = fragment.getParallelExecNum();
             //for ColocateJoin fragment
-            if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
-                    && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) {
+            if ((isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
+                    && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
                 computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params);
+            } else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) {
+                bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params);
             } else {
                 // case A
                 Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator();
@@ -1162,6 +1191,8 @@ public class Coordinator {
                     fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
             if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
                 computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignment);
+            } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
+                bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, idToBackend, addressToBackendID);
             } else {
                 computeScanRangeAssignmentByScheduler(scanNode, locations, assignment);
             }
@@ -1172,7 +1203,6 @@ public class Coordinator {
     private void computeScanRangeAssignmentByColocate(
             final OlapScanNode scanNode,
             FragmentScanRangeAssignment assignment) throws Exception {
-
         if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
             fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
         }
@@ -1399,6 +1429,175 @@ public class Coordinator {
 
     }
 
+    class BucketShuffleJoinController {
+        // fragment_id -> < bucket_seq -> < scannode_id -> scan_range_params >>
+        private Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = Maps.newHashMap();
+        // fragment_id -> < bucket_seq -> be_addresss >
+        private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
+        // fragment_id -> < be_id -> bucket_count >
+        private Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
+        // fragment_id -> bucket_num
+        private Map<PlanFragmentId, Integer> fragmentIdToBucketNumMap = Maps.newHashMap();
+
+        // cache the bucketShuffleFragmentIds
+        private Set<Integer> bucketShuffleFragmentIds = new HashSet<>();
+
+        // check whether the node fragment is bucket shuffle join fragment
+        private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
+            if (ConnectContext.get() != null) {
+                if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+                    return false;
+                }
+            }
+
+            // check the node is be the part of the fragment
+            if (fragmentId != node.getFragmentId().asInt()) {
+                return false;
+            }
+
+            if (bucketShuffleFragmentIds.contains(fragmentId)) {
+                return true;
+            }
+
+            // One fragment could only have one HashJoinNode
+            if (node instanceof HashJoinNode) {
+                HashJoinNode joinNode = (HashJoinNode) node;
+                if (joinNode.isBucketShuffle()) {
+                    bucketShuffleFragmentIds.add(joinNode.getFragmentId().asInt());
+                    return true;
+                }
+            }
+
+            for (PlanNode childNode : node.getChildren()) {
+                return isBucketShuffleJoin(fragmentId, childNode);
+            }
+
+            return false;
+        }
+
+        private boolean isBucketShuffleJoin(int fragmentId) {
+            return bucketShuffleFragmentIds.contains(fragmentId);
+        }
+
+        private int getFragmentBucketNum(PlanFragmentId fragmentId) {
+            return fragmentIdToBucketNumMap.get(fragmentId);
+        }
+
+        // make sure each host have average bucket to scan
+        private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq,
+            ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
+            Map<Long, Integer> buckendIdToBucketCountMap = fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
+            int maxBucketNum = Integer.MAX_VALUE;
+            long buckendId = Long.MAX_VALUE;
+            for (TScanRangeLocation location : seqLocation.locations) {
+                if (buckendIdToBucketCountMap.containsKey(location.backend_id)) {
+                    if (buckendIdToBucketCountMap.get(location.backend_id) < maxBucketNum) {
+                        maxBucketNum = buckendIdToBucketCountMap.get(location.backend_id);
+                        buckendId = location.backend_id;
+                    }
+                } else {
+                    maxBucketNum = 0;
+                    buckendId = location.backend_id;
+                    buckendIdToBucketCountMap.put(buckendId, 0);
+                    break;
+                }
+            }
+
+            buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
+            Reference<Long> backendIdRef = new Reference<Long>();
+            TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
+            if (execHostPort == null) {
+                throw new UserException("there is no scanNode Backend");
+            }
+
+            addressToBackendID.put(execHostPort, backendIdRef.getRef());
+            this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
+        }
+
+        // to ensure the same bucketSeq tablet to the same execHostPort
+        private void computeScanRangeAssignmentByBucket(
+                final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
+            if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
+                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum());
+                fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
+                fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
+                fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
+            }
+            Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
+            BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId());
+
+            for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+                //fill scanRangeParamsList
+                List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
+                if (!bucketSeqToAddress.containsKey(bucketSeq)) {
+                    getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, idToBackend, addressToBackendID);
+                }
+
+                for(TScanRangeLocations location: locations) {
+                    Map<Integer, List<TScanRangeParams>> scanRanges =
+                            findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
+
+                    List<TScanRangeParams> scanRangeParamsList =
+                            findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<TScanRangeParams>());
+
+                    // add scan range
+                    TScanRangeParams scanRangeParams = new TScanRangeParams();
+                    scanRangeParams.scan_range = location.scan_range;
+                    scanRangeParamsList.add(scanRangeParams);
+                }
+            }
+        }
+
+        private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) {
+            Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
+            BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
+
+            // 1. count each node in one fragment should scan how many tablet, gather them in one list
+            Map<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
+            for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
+                TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
+                Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
+
+                if (!addressToScanRanges.containsKey(address)) {
+                    addressToScanRanges.put(address, Lists.newArrayList());
+                }
+                addressToScanRanges.get(address).add(scanRanges);
+            }
+
+            for (Map.Entry<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
+                List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
+                int expectedInstanceNum = 1;
+                if (parallelExecInstanceNum > 1) {
+                    //the scan instance num should not larger than the tablets num
+                    expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
+                }
+
+                // 2. split how many scanRange one instance should scan
+                List<List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange,
+                        expectedInstanceNum);
+
+                // 3.constuct instanceExecParam add the scanRange should be scan by instance
+                for (List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
+                    FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
+
+                    for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
+                        instanceParam.addBucketSeq(nodeScanRangeMap.getKey());
+                        for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.getValue().entrySet()) {
+                            if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
+                                instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
+                            } else {
+                                instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
+                            }
+                        }
+                    }
+                    params.instanceExecParams.add(instanceParam);
+                }
+            }
+        }
+    }
+
+    private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController();
+
     private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
     private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
     private Set<Integer> colocateFragmentIds = new HashSet<>();
@@ -1702,9 +1901,15 @@ public class Coordinator {
         
         int perFragmentInstanceIdx;
         int senderId;
+
+        Set<Integer> bucketSeqSet = Sets.newHashSet();
   
         FragmentExecParams fragmentExecParams;
-        
+
+        public void addBucketSeq(int bucketSeq) {
+            this.bucketSeqSet.add(bucketSeq);
+        }
+
         public FInstanceExecParam(TUniqueId id, TNetworkAddress host,
                 int perFragmentInstanceIdx, FragmentExecParams fragmentExecParams) {
             this.instanceId = id;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6b61e1f..bc2e624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -71,6 +71,7 @@ public class SessionVariable implements Serializable, Writable {
     public static final String BATCH_SIZE = "batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
     public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
+    public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
     public static final String ENABLE_SPILLING = "enable_spilling";
@@ -215,6 +216,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
     private boolean disableColocateJoin = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN)
+    private boolean enableBucketShuffleJoin = false;
+
     @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
     private String preferJoinMethod = "broadcast";
 
@@ -414,6 +418,10 @@ public class SessionVariable implements Serializable, Writable {
         return disableColocateJoin;
     }
 
+    public boolean isEnableBucketShuffleJoin() {
+        return enableBucketShuffleJoin;
+    }
+
     public String getPreferJoinMethod() {return preferJoinMethod; }
 
     public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 970a82e..a946dc6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -38,6 +38,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -68,6 +69,10 @@ public class QueryPlanTest {
 
         // create connect context
         connectContext = UtFrameUtils.createDefaultCtx();
+
+        // disable bucket shuffle join
+        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
+
         // create database
         String createDbStmtStr = "create database test;";
         CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
@@ -288,6 +293,27 @@ public class QueryPlanTest {
                 "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" +
                 "properties(\"replication_num\" = \"1\");");
 
+        createTable("create table test.bucket_shuffle1\n" +
+                "(k1 int, k2 int, k3 int) distributed by hash(k1, k2) buckets 5\n" +
+                "properties(\"replication_num\" = \"1\"" +
+                ");");
+
+        createTable("CREATE TABLE test.`bucket_shuffle2` (\n" +
+                "  `k1` int NULL COMMENT \"\",\n" +
+                "  `k2` smallint(6) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "COMMENT \"OLAP\"\n" +
+                "PARTITION BY RANGE(`k1`)\n" +
+                "(PARTITION p1 VALUES [(\"-128\"), (\"-64\")),\n" +
+                "PARTITION p2 VALUES [(\"-64\"), (\"0\")),\n" +
+                "PARTITION p3 VALUES [(\"0\"), (\"64\")))\n" +
+                "DISTRIBUTED BY HASH(k1, k2) BUCKETS 5\n" +
+                "PROPERTIES (\n" +
+                "\"replication_num\" = \"1\",\n" +
+                "\"in_memory\" = \"false\",\n" +
+                "\"storage_format\" = \"DEFAULT\"\n" +
+                ");");
+
         createTable("create table test.colocate1\n" +
                 "(k1 int, k2 int, k3 int) distributed by hash(k1, k2) buckets 1\n" +
                 "properties(\"replication_num\" = \"1\"," +
@@ -1017,6 +1043,66 @@ public class QueryPlanTest {
     }
 
     @Test
+    public void testBucketShuffleJoin() throws Exception {
+        FeConstants.runningUnitTest = true;
+        // enable bucket shuffle join
+        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", true);
+
+        // set data size and row count for the olap table
+        Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
+        OlapTable tbl = (OlapTable) db.getTable("bucket_shuffle1");
+        for (Partition partition : tbl.getPartitions()) {
+            partition.updateVisibleVersionAndVersionHash(2, 0);
+            for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                mIndex.setRowCount(10000);
+                for (Tablet tablet : mIndex.getTablets()) {
+                    for (Replica replica : tablet.getReplicas()) {
+                        replica.updateVersionInfo(2, 0, 200000, 10000);
+                    }
+                }
+            }
+        }
+
+        db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
+        tbl = (OlapTable) db.getTable("bucket_shuffle2");
+        for (Partition partition : tbl.getPartitions()) {
+            partition.updateVisibleVersionAndVersionHash(2, 0);
+            for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                mIndex.setRowCount(10000);
+                for (Tablet tablet : mIndex.getTablets()) {
+                    for (Replica replica : tablet.getReplicas()) {
+                        replica.updateVersionInfo(2, 0, 200000, 10000);
+                    }
+                }
+            }
+        }
+
+        // single partition
+        String queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where t1.k1 = t2.k1 and t1.k1 = t2.k2";
+        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+
+        // not bucket shuffle join do not support different type
+        queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where cast (t1.k1 as tinyint) = t2.k1 and t1.k1 = t2.k2";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+        // left table distribution column not match
+        queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where t1.k1 = t2.k1";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+        // multi partition, should not be bucket shuffle join
+        queryStr = "explain select * from test.jointest t1, test.bucket_shuffle2 t2 where t1.k1 = t2.k1 and t1.k1 = t2.k2";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+        // disable bucket shuffle join again
+        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
+    }
+
+    @Test
     public void testJoinWithMysqlTable() throws Exception {
         connectContext.setDatabase("default_cluster:test");
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 68d2675..11dc8d1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -17,15 +17,40 @@
 
 package org.apache.doris.qe;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import mockit.Mocked;
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.EmptySetNode;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.MysqlScanNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ScanNode;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TUniqueId;
 
@@ -98,5 +123,237 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
+
+    @Test
+    public void testIsBucketShuffleJoin()  {
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+        PlanNodeId testPaloNodeId = new PlanNodeId(-1);
+        TupleId testTupleId = new TupleId(-1);
+        ArrayList<TupleId> tupleIdArrayList = new ArrayList<>();
+        tupleIdArrayList.add(testTupleId);
+
+        ArrayList<Expr> testJoinexprs = new ArrayList<>();
+        BinaryPredicate binaryPredicate = new BinaryPredicate();
+        testJoinexprs.add(binaryPredicate);
+
+        HashJoinNode hashJoinNode = new HashJoinNode(testPaloNodeId, new EmptySetNode(testPaloNodeId, tupleIdArrayList),
+                new EmptySetNode(testPaloNodeId, tupleIdArrayList) , new TableRef(), testJoinexprs, new ArrayList<>());
+        hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode,
+                new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+
+        // hash join node is not bucket shuffle join
+        Assert.assertEquals(false,
+                Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+        // the fragment id is differernt from hash join node
+        hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode,
+                new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+        hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+        Assert.assertEquals(false,
+                Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+        hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode,
+                new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+        Assert.assertEquals(true,
+                Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+        // the framgent id is in cache, so not do check node again
+        Assert.assertEquals(true,
+                Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1));
+
+    }
+
+    @Test
+    public void testComputeScanRangeAssignmentByBucketq()  {
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+        // init olap scan node of bucket shuffle join
+        TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable = new OlapTable();
+        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
+        tupleDescriptor.setTable(olapTable);
+
+        OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test");
+        ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = ArrayListMultimap.create();
+
+        // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+        TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+        tScanRangeLocation0.backend_id = 0;
+        TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+        tScanRangeLocation1.backend_id = 1;
+        TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+        tScanRangeLocation2.backend_id = 2;
+
+        tScanRangeLocations.locations = new ArrayList<>();
+        tScanRangeLocations.locations.add(tScanRangeLocation0);
+        tScanRangeLocations.locations.add(tScanRangeLocation1);
+        tScanRangeLocations.locations.add(tScanRangeLocation2);
+        for (int i = 0; i < 66; i++) {
+            bucketseq2localtion.put(i, tScanRangeLocations);
+        }
+
+        Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion);
+        olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode,
+                new DataPartition(TPartitionType.UNPARTITIONED)));
+
+
+        // init all backend
+        Backend backend0 = new Backend();
+        backend0.setAlive(true);
+        Backend backend1 = new Backend();
+        backend1.setAlive(true);
+        Backend backend2 = new Backend();
+        backend2.setAlive(true);
+
+        // init all be network address
+        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
+        HashMap<Long, Backend> idToBackend = new HashMap<>();
+        idToBackend.put(0l, backend0);
+        idToBackend.put(1l, backend1);
+        idToBackend.put(2l, backend2);
+
+        Map<TNetworkAddress, Long> addressToBackendID = new HashMap<>();
+        addressToBackendID.put(be0, 0l);
+        addressToBackendID.put(be1, 1l);
+        addressToBackendID.put(be2, 2l);
+
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket",
+                olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID);
+
+        Assert.assertEquals(java.util.Optional.of(66).get(),
+                Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1)));
+
+        Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap =
+                Deencapsulation.getField(bucketShuffleJoinController, "fragmentIdToBuckendIdBucketCountMap");
+
+        long targetBeCount = fragmentIdToBuckendIdBucketCountMap.values().
+                stream().flatMap(buckend2BucketCountMap -> buckend2BucketCountMap.values().stream())
+                .filter(count -> count == 22).count();
+        Assert.assertEquals(targetBeCount, 3);
+    }
+
+    @Test
+    public void testComputeScanRangeAssignmentByBucket()  {
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+        // init olap scan node of bucket shuffle join
+        TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable = new OlapTable();
+        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
+        tupleDescriptor.setTable(olapTable);
+
+        OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test");
+        ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = ArrayListMultimap.create();
+
+        // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+        TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+        tScanRangeLocation0.backend_id = 0;
+        TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+        tScanRangeLocation1.backend_id = 1;
+        TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+        tScanRangeLocation2.backend_id = 2;
+
+        tScanRangeLocations.locations = new ArrayList<>();
+        tScanRangeLocations.locations.add(tScanRangeLocation0);
+        tScanRangeLocations.locations.add(tScanRangeLocation1);
+        tScanRangeLocations.locations.add(tScanRangeLocation2);
+        for (int i = 0; i < 66; i++) {
+            bucketseq2localtion.put(i, tScanRangeLocations);
+        }
+
+        Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion);
+        olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode,
+                new DataPartition(TPartitionType.UNPARTITIONED)));
+
+
+        // init all backend
+        Backend backend0 = new Backend();
+        backend0.setAlive(true);
+        Backend backend1 = new Backend();
+        backend1.setAlive(true);
+        Backend backend2 = new Backend();
+        backend2.setAlive(true);
+
+        // init all be network address
+        TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+        TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+        TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
+        HashMap<Long, Backend> idToBackend = new HashMap<>();
+        idToBackend.put(0l, backend0);
+        idToBackend.put(1l, backend1);
+        idToBackend.put(2l, backend2);
+
+        Map<TNetworkAddress, Long> addressToBackendID = new HashMap<>();
+        addressToBackendID.put(be0, 0l);
+        addressToBackendID.put(be1, 1l);
+        addressToBackendID.put(be2, 2l);
+
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket",
+                olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID);
+
+        Assert.assertEquals(java.util.Optional.of(66).get(),
+                Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1)));
+
+        Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap =
+                Deencapsulation.getField(bucketShuffleJoinController, "fragmentIdToBuckendIdBucketCountMap");
+
+        long targetBeCount = fragmentIdToBuckendIdBucketCountMap.values()
+                .stream()
+                .flatMap(buckend2BucketCountMap -> buckend2BucketCountMap.values().stream())
+                .filter(count -> count == 22).count();
+        Assert.assertEquals(targetBeCount, 3);
+    }
+
+    @Test
+    public void testComputeBucketShuffleJoinInstanceParam()  {
+        Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+        // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
+        Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
+        TNetworkAddress address = new TNetworkAddress();
+        for (int i = 0; i < 3; i++) {
+            bucketSeqToAddress.put(i, address);
+        }
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
+        fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
+        Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
+
+        // 2. set bucketSeqToScanRange in bucketShuffleJoinController
+        Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
+        BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
+        Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
+        ScanRangeMap.put(1, new ArrayList<>());
+        for (int i = 0; i < 3; i++) {
+            bucketSeqToScanRange.put(i, ScanRangeMap);
+        }
+        fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
+        Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
+
+        FragmentExecParams params = new FragmentExecParams(null);
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
+        Assert.assertEquals(1, params.instanceExecParams.size());
+
+        params = new FragmentExecParams(null);
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params);
+        Assert.assertEquals(2, params.instanceExecParams.size());
+
+        params = new FragmentExecParams(null);
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params);
+        Assert.assertEquals(3, params.instanceExecParams.size());
+
+        params = new FragmentExecParams(null);
+        Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params);
+        Assert.assertEquals(3, params.instanceExecParams.size());
+    }
 }
 
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 64de8a3..9a3305b 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -33,7 +33,11 @@ enum TPartitionType {
 
   // ordered partition on a list of exprs
   // (partition bounds don't overlap)
-  RANGE_PARTITIONED
+  RANGE_PARTITIONED,
+
+  // unordered partition on a set of exprs
+  // (only use in bucket shuffle join)
+  BUCKET_SHFFULE_HASH_PARTITIONED
 }
 
 enum TDistributionType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org