You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/10/11 07:37:45 UTC
[incubator-doris] branch master updated: [SQL] Support Bucket
Shuffle Join (#4677)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 04f26e4 [SQL] Support Bucket Shuffle Join (#4677)
04f26e4 is described below
commit 04f26e4b7f9b929c9c713fb01e513f5c80b494c0
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sun Oct 11 15:37:32 2020 +0800
[SQL] Support Bucket Shuffle Join (#4677)
Support Bucket Shuffle Join
issue:#4394
---
be/src/runtime/data_stream_sender.cpp | 61 +++--
.../org/apache/doris/planner/DataPartition.java | 3 +-
.../apache/doris/planner/DistributedPlanner.java | 90 ++++++++
.../org/apache/doris/planner/HashJoinNode.java | 10 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 227 +++++++++++++++++-
.../java/org/apache/doris/qe/SessionVariable.java | 8 +
.../org/apache/doris/planner/QueryPlanTest.java | 86 +++++++
.../java/org/apache/doris/qe/CoordinatorTest.java | 257 +++++++++++++++++++++
gensrc/thrift/Partitions.thrift | 6 +-
9 files changed, 720 insertions(+), 28 deletions(-)
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 543c1f2..b2173e8 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -132,6 +132,10 @@ public:
return uid.to_string();
}
+ TUniqueId get_fragment_instance_id() {
+ return _fragment_instance_id;
+ }
+
private:
inline Status _wait_last_brpc() {
auto cntl = &_closure->cntl;
@@ -241,6 +245,9 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
}
Status DataStreamSender::Channel::add_row(TupleRow* row) {
+ if (_fragment_instance_id.lo == -1) {
+ return Status::OK();
+ }
int row_num = _batch->add_row();
if (row_num == RowBatch::INVALID_ROW_INDEX) {
@@ -329,18 +336,27 @@ DataStreamSender::DataStreamSender(
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
|| sink.output_partition.type == TPartitionType::HASH_PARTITIONED
|| sink.output_partition.type == TPartitionType::RANDOM
- || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED);
- // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
+ || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED
+ || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
+ // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable
+
+ std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < destinations.size(); ++i) {
// Select first dest as transfer chain.
bool is_transfer_chain = (i == 0);
- _channel_shared_ptrs.emplace_back(
- new Channel(this, row_desc,
- destinations[i].brpc_server,
- destinations[i].fragment_instance_id,
- sink.dest_node_id, per_channel_buffer_size,
- is_transfer_chain, send_query_statistics_with_every_batch));
- _channels.push_back(_channel_shared_ptrs[i].get());
+ const auto& fragment_instance_id = destinations[i].fragment_instance_id;
+ if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) {
+ _channel_shared_ptrs.emplace_back(
+ new Channel(this, row_desc,
+ destinations[i].brpc_server,
+ fragment_instance_id,
+ sink.dest_node_id, per_channel_buffer_size,
+ is_transfer_chain, send_query_statistics_with_every_batch));
+ fragment_id_to_channel_index.insert({fragment_instance_id.lo, _channel_shared_ptrs.size() - 1});
+ _channels.push_back(_channel_shared_ptrs.back().get());
+ } else {
+ _channel_shared_ptrs.emplace_back(_channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+ }
}
}
@@ -353,7 +369,7 @@ static bool compare_part_use_range(const PartitionInfo* v1, const PartitionInfo*
Status DataStreamSender::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSink::init(tsink));
const TDataStreamSink& t_stream_sink = tsink.stream_sink;
- if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
RETURN_IF_ERROR(Expr::create_expr_trees(
_pool, t_stream_sink.output_partition.partition_exprs, &_partition_expr_ctxs));
} else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
@@ -402,7 +418,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
// Randomize the order we open/transmit to channels to avoid thundering herd problems.
srand(reinterpret_cast<uint64_t>(this));
random_shuffle(_channels.begin(), _channels.end());
- } else if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
RETURN_IF_ERROR(Expr::prepare(_partition_expr_ctxs, state, _row_desc, _expr_mem_tracker));
} else {
RETURN_IF_ERROR(Expr::prepare(_partition_expr_ctxs, state, _row_desc, _expr_mem_tracker));
@@ -449,7 +465,7 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(_profile->total_time_counter());
// Unpartition or _channel size
- if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
+ if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1 ) {
RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch, _channels.size()));
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
@@ -479,7 +495,26 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
hash_val = RawValue::get_hash_value_fvn(
partition_val, ctx->root()->type(), hash_val);
}
- RETURN_IF_ERROR(_channels[hash_val % num_channels]->add_row(row));
+ auto target_channel_id = hash_val % num_channels;
+ RETURN_IF_ERROR(_channels[target_channel_id]->add_row(row));
+ }
+ } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ // hash-partition batch's rows across channels
+ int num_channels = _channel_shared_ptrs.size();
+
+ for (int i = 0; i < batch->num_rows(); ++i) {
+ TupleRow* row = batch->get_row(i);
+ size_t hash_val = 0;
+
+ for (auto ctx : _partition_expr_ctxs) {
+ void* partition_val = ctx->get_value(row);
+ // We must use the crc hash function to make sure the hash val equal
+ // to left table data distribute hash val
+ hash_val = RawValue::zlib_crc32(
+ partition_val, ctx->root()->type(), hash_val);
+ }
+ auto target_channel_id = hash_val % num_channels;
+ RETURN_IF_ERROR(_channel_shared_ptrs[target_channel_id]->add_row(row));
}
} else {
// Range partition
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 2e9941d..1a293ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -59,7 +59,8 @@ public class DataPartition {
Preconditions.checkNotNull(exprs);
Preconditions.checkState(!exprs.isEmpty());
Preconditions.checkState(
- type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED);
+ type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED
+ || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.copyOf(exprs);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 2d3305d..efaa5c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -390,6 +390,28 @@ public class DistributedPlanner {
node.setColocate(false, reason.get(0));
}
+ // bucket shuffle join is better than broadcast and shuffle join
+ // it can reduce the network cost of join, so doris chose it first
+ List<Expr> rhsPartitionxprs = Lists.newArrayList();
+ if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
+ node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+ DataPartition rhsJoinPartition =
+ new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
+ ExchangeNode rhsExchange =
+ new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot(), false);
+ rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances());
+ rhsExchange.init(ctx_.getRootAnalyzer());
+
+ node.setChild(0, leftChildFragment.getPlanRoot());
+ node.setChild(1, rhsExchange);
+ leftChildFragment.setPlanRoot(node);
+
+ rightChildFragment.setDestination(rhsExchange);
+ rightChildFragment.setOutputPartition(rhsJoinPartition);
+
+ return leftChildFragment;
+ }
+
if (doBroadcast) {
node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
// Doesn't create a new fragment, but modifies leftChildFragment to execute
@@ -498,6 +520,74 @@ public class DistributedPlanner {
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
+ List<Expr> rhsHashExprs) {
+ if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ // If user have a join hint to use proper way of join, can not be bucket shuffle join
+ if (node.getInnerRef().hasJoinHints()) {
+ return false;
+ }
+
+ PlanNode leftRoot = leftChildFragment.getPlanRoot();
+ // leftRoot should be OlapScanNode
+ if (leftRoot instanceof OlapScanNode) {
+ return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+ }
+
+ return false;
+ }
+
+ //the join expr must contian left table distribute column
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
+ List<Expr> rhsJoinExprs) {
+ OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+
+ //1 the left table must be only one partition
+ if (leftScanNode.getSelectedPartitionIds().size() > 1) {
+ return false;
+ }
+
+ DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
+
+ if (leftDistribution instanceof HashDistributionInfo) {
+ List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
+
+ List<Column> leftJoinColumns = new ArrayList<>();
+ List<Expr> rightExprs = new ArrayList<>();
+ List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+ for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+ Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+ Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+ if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) {
+ continue;
+ }
+
+ SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
+
+ leftJoinColumns.add(leftSlot.getColumn());
+ rightExprs.add(rhsJoinExpr);
+ }
+
+ //2 the join columns should contains all left table distribute columns to enable bucket shuffle join
+ for (Column distributeColumn : leftDistributeColumns) {
+ int loc = leftJoinColumns.indexOf(distributeColumn);
+ // TODO: now support bucket shuffle join when distribute column type different with
+ // right expr type
+ if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
+ return false;
+ }
+ rhsJoinExprs.add(rightExprs.get(loc));
+ }
+ } else {
+ return false;
+ }
+
+ return true;
+ }
+
//the table must be colocate
//the colocate group must be stable
//the eqJoinConjuncts must contain the distributionColumns
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index d3ead3f..1768307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -62,6 +62,7 @@ public class HashJoinNode extends PlanNode {
private DistributionMode distrMode;
private boolean isColocate = false; //the flag for colocate join
private String colocateReason = ""; // if can not do colocate join, set reason here
+ private boolean isBucketShuffle = false; // the flag for bucket shuffle join
public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef,
List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
@@ -122,6 +123,10 @@ public class HashJoinNode extends PlanNode {
return isColocate;
}
+ public boolean isBucketShuffle() {
+ return distrMode.equals(DistributionMode.BUCKET_SHUFFLE);
+ }
+
public void setColocate(boolean colocate, String reason) {
isColocate = colocate;
colocateReason = reason;
@@ -299,10 +304,11 @@ public class HashJoinNode extends PlanNode {
return distrMode == DistributionMode.PARTITIONED;
}
- enum DistributionMode {
+ public enum DistributionMode {
NONE("NONE"),
BROADCAST("BROADCAST"),
- PARTITIONED("PARTITIONED");
+ PARTITIONED("PARTITIONED"),
+ BUCKET_SHUFFLE("BUCKET_SHUFFLE");
private final String description;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3164903..0fc2680 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -746,6 +746,7 @@ public class Coordinator {
if (LOG.isDebugEnabled()) {
LOG.debug("fragment {} has instances {}", params.fragment.getFragmentId(), params.instanceExecParams.size());
}
+
for (int j = 0; j < params.instanceExecParams.size(); ++j) {
// we add instance_num to query_id.lo to create a
// globally-unique instance id
@@ -783,13 +784,39 @@ public class Coordinator {
params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
}
- // add destination host to this fragment's destination
- for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
- TPlanFragmentDestination dest = new TPlanFragmentDestination();
- dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
- dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
- dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
- params.destinations.add(dest);
+ if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {
+ int bucketSeq = 0;
+ int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
+ TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
+
+ while (bucketSeq < bucketNum) {
+ TPlanFragmentDestination dest = new TPlanFragmentDestination();
+
+ dest.fragment_instance_id = new TUniqueId(-1, -1);
+ dest.server = dummyServer;
+ dest.setBrpcServer(dummyServer);
+
+ for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
+ if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+ dest.fragment_instance_id = instanceExecParams.instanceId;
+ dest.server = toRpcHost(instanceExecParams.host);
+ dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ break;
+ }
+ }
+
+ bucketSeq++;
+ params.destinations.add(dest);
+ }
+ } else {
+ // add destination host to this fragment's destination
+ for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
+ TPlanFragmentDestination dest = new TPlanFragmentDestination();
+ dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
+ dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
+ dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
+ params.destinations.add(dest);
+ }
}
}
}
@@ -985,9 +1012,11 @@ public class Coordinator {
int parallelExecInstanceNum = fragment.getParallelExecNum();
//for ColocateJoin fragment
- if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
- && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) {
+ if ((isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
+ && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params);
+ } else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) {
+ bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params);
} else {
// case A
Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator();
@@ -1162,6 +1191,8 @@ public class Coordinator {
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignment);
+ } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
+ bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, idToBackend, addressToBackendID);
} else {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment);
}
@@ -1172,7 +1203,6 @@ public class Coordinator {
private void computeScanRangeAssignmentByColocate(
final OlapScanNode scanNode,
FragmentScanRangeAssignment assignment) throws Exception {
-
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
}
@@ -1399,6 +1429,175 @@ public class Coordinator {
}
+ class BucketShuffleJoinController {
+ // fragment_id -> < bucket_seq -> < scannode_id -> scan_range_params >>
+ private Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = Maps.newHashMap();
+ // fragment_id -> < bucket_seq -> be_addresss >
+ private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
+ // fragment_id -> < be_id -> bucket_count >
+ private Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
+ // fragment_id -> bucket_num
+ private Map<PlanFragmentId, Integer> fragmentIdToBucketNumMap = Maps.newHashMap();
+
+ // cache the bucketShuffleFragmentIds
+ private Set<Integer> bucketShuffleFragmentIds = new HashSet<>();
+
+ // check whether the node fragment is bucket shuffle join fragment
+ private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
+ if (ConnectContext.get() != null) {
+ if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ }
+
+ // check the node is be the part of the fragment
+ if (fragmentId != node.getFragmentId().asInt()) {
+ return false;
+ }
+
+ if (bucketShuffleFragmentIds.contains(fragmentId)) {
+ return true;
+ }
+
+ // One fragment could only have one HashJoinNode
+ if (node instanceof HashJoinNode) {
+ HashJoinNode joinNode = (HashJoinNode) node;
+ if (joinNode.isBucketShuffle()) {
+ bucketShuffleFragmentIds.add(joinNode.getFragmentId().asInt());
+ return true;
+ }
+ }
+
+ for (PlanNode childNode : node.getChildren()) {
+ return isBucketShuffleJoin(fragmentId, childNode);
+ }
+
+ return false;
+ }
+
+ private boolean isBucketShuffleJoin(int fragmentId) {
+ return bucketShuffleFragmentIds.contains(fragmentId);
+ }
+
+ private int getFragmentBucketNum(PlanFragmentId fragmentId) {
+ return fragmentIdToBucketNumMap.get(fragmentId);
+ }
+
+ // make sure each host have average bucket to scan
+ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq,
+ ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
+ Map<Long, Integer> buckendIdToBucketCountMap = fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
+ int maxBucketNum = Integer.MAX_VALUE;
+ long buckendId = Long.MAX_VALUE;
+ for (TScanRangeLocation location : seqLocation.locations) {
+ if (buckendIdToBucketCountMap.containsKey(location.backend_id)) {
+ if (buckendIdToBucketCountMap.get(location.backend_id) < maxBucketNum) {
+ maxBucketNum = buckendIdToBucketCountMap.get(location.backend_id);
+ buckendId = location.backend_id;
+ }
+ } else {
+ maxBucketNum = 0;
+ buckendId = location.backend_id;
+ buckendIdToBucketCountMap.put(buckendId, 0);
+ break;
+ }
+ }
+
+ buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
+ Reference<Long> backendIdRef = new Reference<Long>();
+ TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
+ if (execHostPort == null) {
+ throw new UserException("there is no scanNode Backend");
+ }
+
+ addressToBackendID.put(execHostPort, backendIdRef.getRef());
+ this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
+ }
+
+ // to ensure the same bucketSeq tablet to the same execHostPort
+ private void computeScanRangeAssignmentByBucket(
+ final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
+ if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
+ fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum());
+ fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
+ fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
+ fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
+ }
+ Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
+ BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId());
+
+ for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+ //fill scanRangeParamsList
+ List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
+ if (!bucketSeqToAddress.containsKey(bucketSeq)) {
+ getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, idToBackend, addressToBackendID);
+ }
+
+ for(TScanRangeLocations location: locations) {
+ Map<Integer, List<TScanRangeParams>> scanRanges =
+ findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
+
+ List<TScanRangeParams> scanRangeParamsList =
+ findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<TScanRangeParams>());
+
+ // add scan range
+ TScanRangeParams scanRangeParams = new TScanRangeParams();
+ scanRangeParams.scan_range = location.scan_range;
+ scanRangeParamsList.add(scanRangeParams);
+ }
+ }
+ }
+
+ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) {
+ Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
+ BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
+
+ // 1. count each node in one fragment should scan how many tablet, gather them in one list
+ Map<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
+ for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
+ TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
+ Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
+
+ if (!addressToScanRanges.containsKey(address)) {
+ addressToScanRanges.put(address, Lists.newArrayList());
+ }
+ addressToScanRanges.get(address).add(scanRanges);
+ }
+
+ for (Map.Entry<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
+ List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
+ int expectedInstanceNum = 1;
+ if (parallelExecInstanceNum > 1) {
+ //the scan instance num should not larger than the tablets num
+ expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
+ }
+
+ // 2. split how many scanRange one instance should scan
+ List<List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange,
+ expectedInstanceNum);
+
+ // 3.constuct instanceExecParam add the scanRange should be scan by instance
+ for (List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
+ FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
+
+ for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
+ instanceParam.addBucketSeq(nodeScanRangeMap.getKey());
+ for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.getValue().entrySet()) {
+ if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
+ instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
+ } else {
+ instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
+ }
+ }
+ }
+ params.instanceExecParams.add(instanceParam);
+ }
+ }
+ }
+ }
+
+ private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController();
+
private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
private Set<Integer> colocateFragmentIds = new HashSet<>();
@@ -1702,9 +1901,15 @@ public class Coordinator {
int perFragmentInstanceIdx;
int senderId;
+
+ Set<Integer> bucketSeqSet = Sets.newHashSet();
FragmentExecParams fragmentExecParams;
-
+
+ public void addBucketSeq(int bucketSeq) {
+ this.bucketSeqSet.add(bucketSeq);
+ }
+
public FInstanceExecParam(TUniqueId id, TNetworkAddress host,
int perFragmentInstanceIdx, FragmentExecParams fragmentExecParams) {
this.instanceId = id;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6b61e1f..bc2e624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -71,6 +71,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String BATCH_SIZE = "batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
+ public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
@@ -215,6 +216,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
private boolean disableColocateJoin = false;
+ @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN)
+ private boolean enableBucketShuffleJoin = false;
+
@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
private String preferJoinMethod = "broadcast";
@@ -414,6 +418,10 @@ public class SessionVariable implements Serializable, Writable {
return disableColocateJoin;
}
+ public boolean isEnableBucketShuffleJoin() {
+ return enableBucketShuffleJoin;
+ }
+
public String getPreferJoinMethod() {return preferJoinMethod; }
public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 970a82e..a946dc6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -38,6 +38,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -68,6 +69,10 @@ public class QueryPlanTest {
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
+
+ // disable bucket shuffle join
+ Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
+
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
@@ -288,6 +293,27 @@ public class QueryPlanTest {
"(k1 int, k2 int) distributed by hash(k1) buckets 1\n" +
"properties(\"replication_num\" = \"1\");");
+ createTable("create table test.bucket_shuffle1\n" +
+ "(k1 int, k2 int, k3 int) distributed by hash(k1, k2) buckets 5\n" +
+ "properties(\"replication_num\" = \"1\"" +
+ ");");
+
+ createTable("CREATE TABLE test.`bucket_shuffle2` (\n" +
+ " `k1` int NULL COMMENT \"\",\n" +
+ " `k2` smallint(6) NULL COMMENT \"\"\n" +
+ ") ENGINE=OLAP\n" +
+ "COMMENT \"OLAP\"\n" +
+ "PARTITION BY RANGE(`k1`)\n" +
+ "(PARTITION p1 VALUES [(\"-128\"), (\"-64\")),\n" +
+ "PARTITION p2 VALUES [(\"-64\"), (\"0\")),\n" +
+ "PARTITION p3 VALUES [(\"0\"), (\"64\")))\n" +
+ "DISTRIBUTED BY HASH(k1, k2) BUCKETS 5\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\",\n" +
+ "\"in_memory\" = \"false\",\n" +
+ "\"storage_format\" = \"DEFAULT\"\n" +
+ ");");
+
createTable("create table test.colocate1\n" +
"(k1 int, k2 int, k3 int) distributed by hash(k1, k2) buckets 1\n" +
"properties(\"replication_num\" = \"1\"," +
@@ -1017,6 +1043,66 @@ public class QueryPlanTest {
}
@Test
+ public void testBucketShuffleJoin() throws Exception {
+ FeConstants.runningUnitTest = true;
+ // enable bucket shuffle join
+ Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", true);
+
+ // set data size and row count for the olap table
+ Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
+ OlapTable tbl = (OlapTable) db.getTable("bucket_shuffle1");
+ for (Partition partition : tbl.getPartitions()) {
+ partition.updateVisibleVersionAndVersionHash(2, 0);
+ for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ mIndex.setRowCount(10000);
+ for (Tablet tablet : mIndex.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ replica.updateVersionInfo(2, 0, 200000, 10000);
+ }
+ }
+ }
+ }
+
+ db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
+ tbl = (OlapTable) db.getTable("bucket_shuffle2");
+ for (Partition partition : tbl.getPartitions()) {
+ partition.updateVisibleVersionAndVersionHash(2, 0);
+ for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ mIndex.setRowCount(10000);
+ for (Tablet tablet : mIndex.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ replica.updateVersionInfo(2, 0, 200000, 10000);
+ }
+ }
+ }
+ }
+
+ // single partition
+ String queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where t1.k1 = t2.k1 and t1.k1 = t2.k2";
+ String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("BUCKET_SHFFULE"));
+ Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+
+ // not bucket shuffle join do not support different type
+ queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where cast (t1.k1 as tinyint) = t2.k1 and t1.k1 = t2.k2";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+ // left table distribution column not match
+ queryStr = "explain select * from test.jointest t1, test.bucket_shuffle1 t2 where t1.k1 = t2.k1";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+ // multi partition, should not be bucket shuffle join
+ queryStr = "explain select * from test.jointest t1, test.bucket_shuffle2 t2 where t1.k1 = t2.k1 and t1.k1 = t2.k2";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
+
+ // disable bucket shuffle join again
+ Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
+ }
+
+ @Test
public void testJoinWithMysqlTable() throws Exception {
connectContext.setDatabase("default_cluster:test");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 68d2675..11dc8d1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -17,15 +17,40 @@
package org.apache.doris.qe;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import mockit.Mocked;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.persist.EditLog;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.EmptySetNode;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.MysqlScanNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ScanNode;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;
@@ -98,5 +123,237 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params);
Assert.assertEquals(3, params.instanceExecParams.size());
}
+
+ @Test
+ public void testIsBucketShuffleJoin() {
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+ PlanNodeId testPaloNodeId = new PlanNodeId(-1);
+ TupleId testTupleId = new TupleId(-1);
+ ArrayList<TupleId> tupleIdArrayList = new ArrayList<>();
+ tupleIdArrayList.add(testTupleId);
+
+ ArrayList<Expr> testJoinexprs = new ArrayList<>();
+ BinaryPredicate binaryPredicate = new BinaryPredicate();
+ testJoinexprs.add(binaryPredicate);
+
+ HashJoinNode hashJoinNode = new HashJoinNode(testPaloNodeId, new EmptySetNode(testPaloNodeId, tupleIdArrayList),
+ new EmptySetNode(testPaloNodeId, tupleIdArrayList) , new TableRef(), testJoinexprs, new ArrayList<>());
+ hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode,
+ new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+
+ // hash join node is not bucket shuffle join
+ Assert.assertEquals(false,
+ Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+ // the fragment id is differernt from hash join node
+ hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode,
+ new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+ hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+ Assert.assertEquals(false,
+ Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+ hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode,
+ new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs)));
+ Assert.assertEquals(true,
+ Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode));
+
+ // the framgent id is in cache, so not do check node again
+ Assert.assertEquals(true,
+ Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1));
+
+ }
+
+ @Test
+ public void testComputeScanRangeAssignmentByBucketq() {
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+ // init olap scan node of bucket shuffle join
+ TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+ OlapTable olapTable = new OlapTable();
+ HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
+ Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
+ tupleDescriptor.setTable(olapTable);
+
+ OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test");
+ ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = ArrayListMultimap.create();
+
+ // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+ for (int i = 0; i < 66; i++) {
+ bucketseq2localtion.put(i, tScanRangeLocations);
+ }
+
+ Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion);
+ olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode,
+ new DataPartition(TPartitionType.UNPARTITIONED)));
+
+
+ // init all backend
+ Backend backend0 = new Backend();
+ backend0.setAlive(true);
+ Backend backend1 = new Backend();
+ backend1.setAlive(true);
+ Backend backend2 = new Backend();
+ backend2.setAlive(true);
+
+ // init all be network address
+ TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+ TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+ TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
+ HashMap<Long, Backend> idToBackend = new HashMap<>();
+ idToBackend.put(0l, backend0);
+ idToBackend.put(1l, backend1);
+ idToBackend.put(2l, backend2);
+
+ Map<TNetworkAddress, Long> addressToBackendID = new HashMap<>();
+ addressToBackendID.put(be0, 0l);
+ addressToBackendID.put(be1, 1l);
+ addressToBackendID.put(be2, 2l);
+
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket",
+ olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID);
+
+ Assert.assertEquals(java.util.Optional.of(66).get(),
+ Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1)));
+
+ Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap =
+ Deencapsulation.getField(bucketShuffleJoinController, "fragmentIdToBuckendIdBucketCountMap");
+
+ long targetBeCount = fragmentIdToBuckendIdBucketCountMap.values().
+ stream().flatMap(buckend2BucketCountMap -> buckend2BucketCountMap.values().stream())
+ .filter(count -> count == 22).count();
+ Assert.assertEquals(targetBeCount, 3);
+ }
+
+ @Test
+ public void testComputeScanRangeAssignmentByBucket() {
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+ // init olap scan node of bucket shuffle join
+ TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+ OlapTable olapTable = new OlapTable();
+ HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
+ Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
+ tupleDescriptor.setTable(olapTable);
+
+ OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test");
+ ArrayListMultimap<Integer, TScanRangeLocations> bucketseq2localtion = ArrayListMultimap.create();
+
+ // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+ for (int i = 0; i < 66; i++) {
+ bucketseq2localtion.put(i, tScanRangeLocations);
+ }
+
+ Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion);
+ olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode,
+ new DataPartition(TPartitionType.UNPARTITIONED)));
+
+
+ // init all backend
+ Backend backend0 = new Backend();
+ backend0.setAlive(true);
+ Backend backend1 = new Backend();
+ backend1.setAlive(true);
+ Backend backend2 = new Backend();
+ backend2.setAlive(true);
+
+ // init all be network address
+ TNetworkAddress be0 = new TNetworkAddress("0.0.0.0", 1000);
+ TNetworkAddress be1 = new TNetworkAddress("0.0.0.1", 2000);
+ TNetworkAddress be2 = new TNetworkAddress("0.0.0.2", 3000);
+
+ HashMap<Long, Backend> idToBackend = new HashMap<>();
+ idToBackend.put(0l, backend0);
+ idToBackend.put(1l, backend1);
+ idToBackend.put(2l, backend2);
+
+ Map<TNetworkAddress, Long> addressToBackendID = new HashMap<>();
+ addressToBackendID.put(be0, 0l);
+ addressToBackendID.put(be1, 1l);
+ addressToBackendID.put(be2, 2l);
+
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeScanRangeAssignmentByBucket",
+ olapScanNode, ImmutableMap.copyOf(idToBackend), addressToBackendID);
+
+ Assert.assertEquals(java.util.Optional.of(66).get(),
+ Deencapsulation.invoke(bucketShuffleJoinController, "getFragmentBucketNum", new PlanFragmentId(1)));
+
+ Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap =
+ Deencapsulation.getField(bucketShuffleJoinController, "fragmentIdToBuckendIdBucketCountMap");
+
+ long targetBeCount = fragmentIdToBuckendIdBucketCountMap.values()
+ .stream()
+ .flatMap(buckend2BucketCountMap -> buckend2BucketCountMap.values().stream())
+ .filter(count -> count == 22).count();
+ Assert.assertEquals(targetBeCount, 3);
+ }
+
+ @Test
+ public void testComputeBucketShuffleJoinInstanceParam() {
+ Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController();
+
+ // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
+ Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
+ TNetworkAddress address = new TNetworkAddress();
+ for (int i = 0; i < 3; i++) {
+ bucketSeqToAddress.put(i, address);
+ }
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
+ fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
+ Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
+
+ // 2. set bucketSeqToScanRange in bucketShuffleJoinController
+ Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
+ BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
+ Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
+ ScanRangeMap.put(1, new ArrayList<>());
+ for (int i = 0; i < 3; i++) {
+ bucketSeqToScanRange.put(i, ScanRangeMap);
+ }
+ fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
+ Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
+
+ FragmentExecParams params = new FragmentExecParams(null);
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
+ Assert.assertEquals(1, params.instanceExecParams.size());
+
+ params = new FragmentExecParams(null);
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params);
+ Assert.assertEquals(2, params.instanceExecParams.size());
+
+ params = new FragmentExecParams(null);
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params);
+ Assert.assertEquals(3, params.instanceExecParams.size());
+
+ params = new FragmentExecParams(null);
+ Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params);
+ Assert.assertEquals(3, params.instanceExecParams.size());
+ }
}
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 64de8a3..9a3305b 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -33,7 +33,11 @@ enum TPartitionType {
// ordered partition on a list of exprs
// (partition bounds don't overlap)
- RANGE_PARTITIONED
+ RANGE_PARTITIONED,
+
+ // unordered partition on a set of exprs
+ // (only use in bucket shuffle join)
+ BUCKET_SHFFULE_HASH_PARTITIONED
}
enum TDistributionType {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org