You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/29 02:51:12 UTC

[pinot] branch master updated: Enhance hash exchange (#11217)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 86a5c2af8b Enhance hash exchange (#11217)
86a5c2af8b is described below

commit 86a5c2af8b50cc168f5890fe9731725b9b59bdb6
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 28 19:51:07 2023 -0700

    Enhance hash exchange (#11217)
---
 .../runtime/operator/exchange/HashExchange.java      | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 2f14d1445a..1203d0cb92 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -47,16 +47,24 @@ class HashExchange extends BlockExchange {
   @Override
   protected void route(List<SendingMailbox> destinations, TransferableBlock block)
       throws Exception {
-    List<Object[]>[] destIdxToRows = new List[destinations.size()];
+    int numMailboxes = destinations.size();
+    if (numMailboxes == 1) {
+      sendBlock(destinations.get(0), block);
+      return;
+    }
+
+    List<Object[]>[] destIdxToRows = new List[numMailboxes];
     List<Object[]> container = block.getContainer();
     for (Object[] row : container) {
-      int partition = _keySelector.computeHash(row) % destinations.size();
-      if (destIdxToRows[partition] == null) {
-        destIdxToRows[partition] = new ArrayList<>(container.size());
+      int index = _keySelector.computeHash(row) % numMailboxes;
+      List<Object[]> rows = destIdxToRows[index];
+      if (rows == null) {
+        rows = new ArrayList<>();
+        destIdxToRows[index] = rows;
       }
-      destIdxToRows[partition].add(row);
+      rows.add(row);
     }
-    for (int i = 0; i < destinations.size(); i++) {
+    for (int i = 0; i < numMailboxes; i++) {
       if (destIdxToRows[i] != null) {
         sendBlock(destinations.get(i), new TransferableBlock(destIdxToRows[i], block.getDataSchema(), block.getType()));
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org