You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/19 06:18:58 UTC

[pinot] branch master updated: V2 allocation optimizations (#11112)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad26013ae7 V2 allocation optimizations (#11112)
ad26013ae7 is described below

commit ad26013ae7f894bfeaa1ea21119ea88766df4142
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Wed Jul 19 08:18:53 2023 +0200

    V2 allocation optimizations (#11112)
---
 .../core/operator/query/SelectionOnlyOperator.java |   3 +-
 .../query/runtime/operator/HashJoinOperator.java   | 136 ++++++++++++++-------
 .../query/runtime/operator/TransformOperator.java  |   2 +-
 .../runtime/operator/exchange/HashExchange.java    |   5 +-
 4 files changed, 96 insertions(+), 50 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index 613512fb1d..77ba15b6c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -49,7 +49,7 @@ public class SelectionOnlyOperator extends BaseOperator<SelectionResultsBlock> {
   private final BlockValSet[] _blockValSets;
   private final DataSchema _dataSchema;
   private final int _numRowsToKeep;
-  private final List<Object[]> _rows;
+  private final ArrayList<Object[]> _rows;
   private final RoaringBitmap[] _nullBitmaps;
 
   private int _numDocsScanned = 0;
@@ -102,6 +102,7 @@ public class SelectionOnlyOperator extends BaseOperator<SelectionResultsBlock> {
       RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(_blockValSets);
 
       int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), valueBlock.getNumDocs());
+      _rows.ensureCapacity(_rows.size() + numDocsToAdd);
       _numDocsScanned += numDocsToAdd;
       if (_nullHandlingEnabled) {
         for (int i = 0; i < numExpressions; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 8d5e5b1a06..f9ade5a99c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -59,12 +59,13 @@ import org.slf4j.LoggerFactory;
 // TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728)
 public class HashJoinOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
+  private static final int INITIAL_HEURISTIC_SIZE = 16;
   private static final Logger LOGGER = LoggerFactory.getLogger(AggregateOperator.class);
 
   private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(
       JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI);
 
-  private final HashMap<Key, List<Object[]>> _broadcastRightTable;
+  private final HashMap<Key, ArrayList<Object[]>> _broadcastRightTable;
 
   // Used to track matched right rows.
   // Only used for right join and full join to output non-matched right rows.
@@ -170,8 +171,12 @@ public class HashJoinOperator extends MultiStageOperator {
       List<Object[]> container = rightBlock.getContainer();
       // put all the rows into corresponding hash collections keyed by the key selector function.
       for (Object[] row : container) {
-        List<Object[]> hashCollection =
-            _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+        ArrayList<Object[]> hashCollection = _broadcastRightTable.computeIfAbsent(
+            new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE));
+        int size = hashCollection.size();
+        if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is power of 2
+          hashCollection.ensureCapacity(size << 1);
+        }
         hashCollection.add(row);
       }
       rightBlock = _rightTableOperator.nextBlock();
@@ -198,7 +203,7 @@ public class HashJoinOperator extends MultiStageOperator {
     if (leftBlock.isSuccessfulEndOfStreamBlock() && needUnmatchedRightRows()) {
       // Return remaining non-matched rows for non-inner join.
       List<Object[]> returnRows = new ArrayList<>();
-      for (Map.Entry<Key, List<Object[]>> entry : _broadcastRightTable.entrySet()) {
+      for (Map.Entry<Key, ArrayList<Object[]>> entry : _broadcastRightTable.entrySet()) {
         Set<Integer> matchedIdx = _matchedRightRows.getOrDefault(entry.getKey(), new HashSet<>());
         List<Object[]> rightRows = entry.getValue();
         if (rightRows.size() == matchedIdx.size()) {
@@ -213,57 +218,96 @@ public class HashJoinOperator extends MultiStageOperator {
       _isTerminated = true;
       return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW);
     }
-    List<Object[]> rows = new ArrayList<>();
-    List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new ArrayList<>() : leftBlock.getContainer();
-    for (Object[] leftRow : container) {
-      Key key = new Key(_leftKeySelector.getKey(leftRow));
+    List<Object[]> rows;
+    if (leftBlock.isEndOfStreamBlock()) {
+      rows = new ArrayList<>();
+    } else {
       switch (_joinType) {
-        case SEMI:
-          // SEMI-JOIN only checks existence of the key
-          if (_broadcastRightTable.containsKey(key)) {
-            rows.add(joinRow(leftRow, null));
-          }
+        case SEMI: {
+          rows = buildJoinedDataBlockSemi(leftBlock);
           break;
-        case ANTI:
-          // ANTI-JOIN only checks non-existence of the key
-          if (!_broadcastRightTable.containsKey(key)) {
-            rows.add(joinRow(leftRow, null));
-          }
+        }
+        case ANTI: {
+          rows = buildJoinedDataBlockAnti(leftBlock);
           break;
-        default: // INNER, LEFT, RIGHT, FULL
-          // NOTE: Empty key selector will always give same hash code.
-          List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key, null);
-          if (matchedRightRows == null) {
-            if (needUnmatchedLeftRows()) {
-              rows.add(joinRow(leftRow, null));
-            }
-            continue;
-          }
-          boolean hasMatchForLeftRow = false;
-          for (int i = 0; i < matchedRightRows.size(); i++) {
-            Object[] rightRow = matchedRightRows.get(i);
-            // TODO: Optimize this to avoid unnecessary object copy.
-            Object[] resultRow = joinRow(leftRow, rightRow);
-            if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
-                evaluator -> (Boolean) TypeUtils.convert(evaluator.apply(resultRow),
-                    DataSchema.ColumnDataType.BOOLEAN))) {
-              rows.add(resultRow);
-              hasMatchForLeftRow = true;
-              if (_matchedRightRows != null) {
-                HashSet<Integer> matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
-                matchedRows.add(i);
-              }
-            }
-          }
-          if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
-            rows.add(joinRow(leftRow, null));
-          }
+        }
+        default: { // INNER, LEFT, RIGHT, FULL
+          rows = buildJoinedDataBlockDefault(leftBlock);
           break;
+        }
       }
     }
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
+  private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
+    List<Object[]> container = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(container.size());
+
+    for (Object[] leftRow : container) {
+      Key key = new Key(_leftKeySelector.getKey(leftRow));
+      // SEMI-JOIN only checks existence of the key
+      if (_broadcastRightTable.containsKey(key)) {
+        rows.add(joinRow(leftRow, null));
+      }
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) {
+    List<Object[]> container = leftBlock.getContainer();
+    ArrayList<Object[]> rows = new ArrayList<>(container.size());
+
+    for (Object[] leftRow : container) {
+      Key key = new Key(_leftKeySelector.getKey(leftRow));
+      // NOTE: Empty key selector will always give same hash code.
+      List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key, null);
+      if (matchedRightRows == null) {
+        if (needUnmatchedLeftRows()) {
+          rows.add(joinRow(leftRow, null));
+        }
+        continue;
+      }
+      boolean hasMatchForLeftRow = false;
+      rows.ensureCapacity(rows.size() + matchedRightRows.size());
+      for (int i = 0; i < matchedRightRows.size(); i++) {
+        Object[] rightRow = matchedRightRows.get(i);
+        // TODO: Optimize this to avoid unnecessary object copy.
+        Object[] resultRow = joinRow(leftRow, rightRow);
+        if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+            evaluator -> (Boolean) TypeUtils.convert(evaluator.apply(resultRow),
+                DataSchema.ColumnDataType.BOOLEAN))) {
+          rows.add(resultRow);
+          hasMatchForLeftRow = true;
+          if (_matchedRightRows != null) {
+            HashSet<Integer> matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
+            matchedRows.add(i);
+          }
+        }
+      }
+      if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
+        rows.add(joinRow(leftRow, null));
+      }
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) {
+    List<Object[]> container = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(container.size());
+
+    for (Object[] leftRow : container) {
+      Key key = new Key(_leftKeySelector.getKey(leftRow));
+      // ANTI-JOIN only checks non-existence of the key
+      if (!_broadcastRightTable.containsKey(key)) {
+        rows.add(joinRow(leftRow, null));
+      }
+    }
+    return rows;
+  }
+
   private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) {
     Object[] resultRow = new Object[_resultColumnSize];
     int idx = 0;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index a913d95295..4aff16138f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -108,8 +108,8 @@ public class TransformOperator extends MultiStageOperator {
       return block;
     }
 
-    List<Object[]> resultRows = new ArrayList<>();
     List<Object[]> container = block.getContainer();
+    List<Object[]> resultRows = new ArrayList<>(container.size());
     for (Object[] row : container) {
       Object[] resultRow = new Object[_resultColumnSize];
       for (int i = 0; i < _resultColumnSize; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index b5c0857107..2f14d1445a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -48,10 +48,11 @@ class HashExchange extends BlockExchange {
   protected void route(List<SendingMailbox> destinations, TransferableBlock block)
       throws Exception {
     List<Object[]>[] destIdxToRows = new List[destinations.size()];
-    for (Object[] row : block.getContainer()) {
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
       int partition = _keySelector.computeHash(row) % destinations.size();
       if (destIdxToRows[partition] == null) {
-        destIdxToRows[partition] = new ArrayList<>();
+        destIdxToRows[partition] = new ArrayList<>(container.size());
       }
       destIdxToRows[partition].add(row);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org