You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/19 06:18:58 UTC
[pinot] branch master updated: V2 allocation optimizations (#11112)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad26013ae7 V2 allocation optimizations (#11112)
ad26013ae7 is described below
commit ad26013ae7f894bfeaa1ea21119ea88766df4142
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Wed Jul 19 08:18:53 2023 +0200
V2 allocation optimizations (#11112)
---
.../core/operator/query/SelectionOnlyOperator.java | 3 +-
.../query/runtime/operator/HashJoinOperator.java | 136 ++++++++++++++-------
.../query/runtime/operator/TransformOperator.java | 2 +-
.../runtime/operator/exchange/HashExchange.java | 5 +-
4 files changed, 96 insertions(+), 50 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index 613512fb1d..77ba15b6c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -49,7 +49,7 @@ public class SelectionOnlyOperator extends BaseOperator<SelectionResultsBlock> {
private final BlockValSet[] _blockValSets;
private final DataSchema _dataSchema;
private final int _numRowsToKeep;
- private final List<Object[]> _rows;
+ private final ArrayList<Object[]> _rows;
private final RoaringBitmap[] _nullBitmaps;
private int _numDocsScanned = 0;
@@ -102,6 +102,7 @@ public class SelectionOnlyOperator extends BaseOperator<SelectionResultsBlock> {
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(_blockValSets);
int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), valueBlock.getNumDocs());
+ _rows.ensureCapacity(_rows.size() + numDocsToAdd);
_numDocsScanned += numDocsToAdd;
if (_nullHandlingEnabled) {
for (int i = 0; i < numExpressions; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 8d5e5b1a06..f9ade5a99c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -59,12 +59,13 @@ import org.slf4j.LoggerFactory;
// TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728)
public class HashJoinOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "HASH_JOIN";
+ private static final int INITIAL_HEURISTIC_SIZE = 16;
private static final Logger LOGGER = LoggerFactory.getLogger(AggregateOperator.class);
private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(
JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI);
- private final HashMap<Key, List<Object[]>> _broadcastRightTable;
+ private final HashMap<Key, ArrayList<Object[]>> _broadcastRightTable;
// Used to track matched right rows.
// Only used for right join and full join to output non-matched right rows.
@@ -170,8 +171,12 @@ public class HashJoinOperator extends MultiStageOperator {
List<Object[]> container = rightBlock.getContainer();
// put all the rows into corresponding hash collections keyed by the key selector function.
for (Object[] row : container) {
- List<Object[]> hashCollection =
- _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+ ArrayList<Object[]> hashCollection = _broadcastRightTable.computeIfAbsent(
+ new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE));
+ int size = hashCollection.size();
+ if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is power of 2
+ hashCollection.ensureCapacity(size << 1);
+ }
hashCollection.add(row);
}
rightBlock = _rightTableOperator.nextBlock();
@@ -198,7 +203,7 @@ public class HashJoinOperator extends MultiStageOperator {
if (leftBlock.isSuccessfulEndOfStreamBlock() && needUnmatchedRightRows()) {
// Return remaining non-matched rows for non-inner join.
List<Object[]> returnRows = new ArrayList<>();
- for (Map.Entry<Key, List<Object[]>> entry : _broadcastRightTable.entrySet()) {
+ for (Map.Entry<Key, ArrayList<Object[]>> entry : _broadcastRightTable.entrySet()) {
Set<Integer> matchedIdx = _matchedRightRows.getOrDefault(entry.getKey(), new HashSet<>());
List<Object[]> rightRows = entry.getValue();
if (rightRows.size() == matchedIdx.size()) {
@@ -213,57 +218,96 @@ public class HashJoinOperator extends MultiStageOperator {
_isTerminated = true;
return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW);
}
- List<Object[]> rows = new ArrayList<>();
- List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new ArrayList<>() : leftBlock.getContainer();
- for (Object[] leftRow : container) {
- Key key = new Key(_leftKeySelector.getKey(leftRow));
+ List<Object[]> rows;
+ if (leftBlock.isEndOfStreamBlock()) {
+ rows = new ArrayList<>();
+ } else {
switch (_joinType) {
- case SEMI:
- // SEMI-JOIN only checks existence of the key
- if (_broadcastRightTable.containsKey(key)) {
- rows.add(joinRow(leftRow, null));
- }
+ case SEMI: {
+ rows = buildJoinedDataBlockSemi(leftBlock);
break;
- case ANTI:
- // ANTI-JOIN only checks non-existence of the key
- if (!_broadcastRightTable.containsKey(key)) {
- rows.add(joinRow(leftRow, null));
- }
+ }
+ case ANTI: {
+ rows = buildJoinedDataBlockAnti(leftBlock);
break;
- default: // INNER, LEFT, RIGHT, FULL
- // NOTE: Empty key selector will always give same hash code.
- List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key, null);
- if (matchedRightRows == null) {
- if (needUnmatchedLeftRows()) {
- rows.add(joinRow(leftRow, null));
- }
- continue;
- }
- boolean hasMatchForLeftRow = false;
- for (int i = 0; i < matchedRightRows.size(); i++) {
- Object[] rightRow = matchedRightRows.get(i);
- // TODO: Optimize this to avoid unnecessary object copy.
- Object[] resultRow = joinRow(leftRow, rightRow);
- if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
- evaluator -> (Boolean) TypeUtils.convert(evaluator.apply(resultRow),
- DataSchema.ColumnDataType.BOOLEAN))) {
- rows.add(resultRow);
- hasMatchForLeftRow = true;
- if (_matchedRightRows != null) {
- HashSet<Integer> matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
- matchedRows.add(i);
- }
- }
- }
- if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
- rows.add(joinRow(leftRow, null));
- }
+ }
+ default: { // INNER, LEFT, RIGHT, FULL
+ rows = buildJoinedDataBlockDefault(leftBlock);
break;
+ }
}
}
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
+ private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
+ List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // SEMI-JOIN only checks existence of the key
+ if (_broadcastRightTable.containsKey(key)) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) {
+ List<Object[]> container = leftBlock.getContainer();
+ ArrayList<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // NOTE: Empty key selector will always give same hash code.
+ List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key, null);
+ if (matchedRightRows == null) {
+ if (needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ continue;
+ }
+ boolean hasMatchForLeftRow = false;
+ rows.ensureCapacity(rows.size() + matchedRightRows.size());
+ for (int i = 0; i < matchedRightRows.size(); i++) {
+ Object[] rightRow = matchedRightRows.get(i);
+ // TODO: Optimize this to avoid unnecessary object copy.
+ Object[] resultRow = joinRow(leftRow, rightRow);
+ if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+ evaluator -> (Boolean) TypeUtils.convert(evaluator.apply(resultRow),
+ DataSchema.ColumnDataType.BOOLEAN))) {
+ rows.add(resultRow);
+ hasMatchForLeftRow = true;
+ if (_matchedRightRows != null) {
+ HashSet<Integer> matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
+ matchedRows.add(i);
+ }
+ }
+ }
+ if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) {
+ List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // ANTI-JOIN only checks non-existence of the key
+ if (!_broadcastRightTable.containsKey(key)) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+ return rows;
+ }
+
private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) {
Object[] resultRow = new Object[_resultColumnSize];
int idx = 0;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index a913d95295..4aff16138f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -108,8 +108,8 @@ public class TransformOperator extends MultiStageOperator {
return block;
}
- List<Object[]> resultRows = new ArrayList<>();
List<Object[]> container = block.getContainer();
+ List<Object[]> resultRows = new ArrayList<>(container.size());
for (Object[] row : container) {
Object[] resultRow = new Object[_resultColumnSize];
for (int i = 0; i < _resultColumnSize; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index b5c0857107..2f14d1445a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -48,10 +48,11 @@ class HashExchange extends BlockExchange {
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
List<Object[]>[] destIdxToRows = new List[destinations.size()];
- for (Object[] row : block.getContainer()) {
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
int partition = _keySelector.computeHash(row) % destinations.size();
if (destIdxToRows[partition] == null) {
- destIdxToRows[partition] = new ArrayList<>();
+ destIdxToRows[partition] = new ArrayList<>(container.size());
}
destIdxToRows[partition].add(row);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org