You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/29 02:51:12 UTC
[pinot] branch master updated: Enhance hash exchange (#11217)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 86a5c2af8b Enhance hash exchange (#11217)
86a5c2af8b is described below
commit 86a5c2af8b50cc168f5890fe9731725b9b59bdb6
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 28 19:51:07 2023 -0700
Enhance hash exchange (#11217)
---
.../runtime/operator/exchange/HashExchange.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 2f14d1445a..1203d0cb92 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -47,16 +47,24 @@ class HashExchange extends BlockExchange {
@Override
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
- List<Object[]>[] destIdxToRows = new List[destinations.size()];
+ int numMailboxes = destinations.size();
+ if (numMailboxes == 1) {
+ sendBlock(destinations.get(0), block);
+ return;
+ }
+
+ List<Object[]>[] destIdxToRows = new List[numMailboxes];
List<Object[]> container = block.getContainer();
for (Object[] row : container) {
- int partition = _keySelector.computeHash(row) % destinations.size();
- if (destIdxToRows[partition] == null) {
- destIdxToRows[partition] = new ArrayList<>(container.size());
+ int index = _keySelector.computeHash(row) % numMailboxes;
+ List<Object[]> rows = destIdxToRows[index];
+ if (rows == null) {
+ rows = new ArrayList<>();
+ destIdxToRows[index] = rows;
}
- destIdxToRows[partition].add(row);
+ rows.add(row);
}
- for (int i = 0; i < destinations.size(); i++) {
+ for (int i = 0; i < numMailboxes; i++) {
if (destIdxToRows[i] != null) {
sendBlock(destinations.get(i), new TransferableBlock(destIdxToRows[i], block.getDataSchema(), block.getType()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org