You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/09 02:54:57 UTC
(pinot) branch master updated: [multistage][bugfix] fix operator eos pull (#11970)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 73d82ec666 [multistage][bugfix] fix operator eos pull (#11970)
73d82ec666 is described below
commit 73d82ec6660dd5195bf4b2f07c06fbd1a9c5e96c
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Nov 8 18:54:50 2023 -0800
[multistage][bugfix] fix operator eos pull (#11970)
* clean up single input stop-the-world operator
* clean up 2-input stop-the-world on right, stream on left operator
* clean up single-input stream operator
* refactor try-catch block into base class; and fix set op right-side error handling
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../query/runtime/operator/AggregateOperator.java | 27 ++----
.../query/runtime/operator/HashJoinOperator.java | 31 +++---
.../LeafStageTransferableBlockOperator.java | 37 ++++---
.../query/runtime/operator/MultiStageOperator.java | 16 ++-
.../query/runtime/operator/OperatorStats.java | 2 -
.../pinot/query/runtime/operator/SetOperator.java | 18 +++-
.../pinot/query/runtime/operator/SortOperator.java | 107 +++++++++------------
.../query/runtime/operator/TransformOperator.java | 24 +----
.../runtime/operator/WindowAggregateOperator.java | 21 ++--
9 files changed, 127 insertions(+), 156 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 5cb825c149..39577ba408 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -70,7 +70,7 @@ public class AggregateOperator extends MultiStageOperator {
private final MultistageAggregationExecutor _aggregationExecutor;
private final MultistageGroupByExecutor _groupByExecutor;
- private boolean _hasReturnedAggregateBlock;
+ private boolean _hasConstructedAggregateBlock;
public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, DataSchema resultSchema,
List<RexExpression> aggCalls, List<RexExpression> groupSet, AggType aggType, List<Integer> filterArgIndices,
@@ -131,26 +131,19 @@ public class AggregateOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- try {
- TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
-
- // setting upstream error block
- if (finalBlock.isErrorBlock()) {
- return finalBlock;
- }
-
- if (!_hasReturnedAggregateBlock) {
- return produceAggregatedBlock();
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(e);
+ if (_hasConstructedAggregateBlock) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
+ // returning upstream error block if finalBlock contains error.
+ if (finalBlock.isErrorBlock()) {
+ return finalBlock;
}
+ return produceAggregatedBlock();
}
private TransferableBlock produceAggregatedBlock() {
- _hasReturnedAggregateBlock = true;
+ _hasConstructedAggregateBlock = true;
if (_aggregationExecutor != null) {
return new TransferableBlock(_aggregationExecutor.getResult(), _resultSchema, DataBlock.Type.ROW);
} else {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index c508b373c5..96645c8882 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -188,24 +188,21 @@ public class HashJoinOperator extends MultiStageOperator {
}
@Override
- protected TransferableBlock getNextBlock() {
- try {
- if (_isTerminated) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- if (!_isHashTableBuilt) {
- // Build JOIN hash table
- buildBroadcastHashTable();
- }
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- }
- TransferableBlock leftBlock = _leftTableOperator.nextBlock();
- // JOIN each left block with the right block.
- return buildJoinedDataBlock(leftBlock);
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(e);
+ protected TransferableBlock getNextBlock()
+ throws ProcessingException {
+ if (_isTerminated) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ if (!_isHashTableBuilt) {
+ // Build JOIN hash table
+ buildBroadcastHashTable();
+ }
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
}
+ TransferableBlock leftBlock = _leftTableOperator.nextBlock();
+ // JOIN each left block with the constructed right hash table.
+ return buildJoinedDataBlock(leftBlock);
}
private void buildBroadcastHashTable()
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 1a109358ae..ea5a47df98 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -112,29 +112,26 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
}
@Override
- protected TransferableBlock getNextBlock() {
+ protected TransferableBlock getNextBlock()
+ throws InterruptedException, TimeoutException {
if (_executionFuture == null) {
_executionFuture = startExecution();
}
- try {
- BaseResultsBlock resultsBlock =
- _blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (resultsBlock == null) {
- throw new TimeoutException("Timed out waiting for results block");
- }
- // Terminate when receiving exception block
- Map<Integer, String> exceptions = _exceptions;
- if (exceptions != null) {
- return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
- }
- if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
- return constructMetadataBlock();
- } else {
- // Regular data block
- return composeTransferableBlock(resultsBlock, _dataSchema);
- }
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(e);
+ BaseResultsBlock resultsBlock =
+ _blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (resultsBlock == null) {
+ throw new TimeoutException("Timed out waiting for results block");
+ }
+ // Terminate when receiving exception block
+ Map<Integer, String> exceptions = _exceptions;
+ if (exceptions != null) {
+ return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
+ }
+ if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
+ return constructMetadataBlock();
+ } else {
+ // Regular data block
+ return composeTransferableBlock(resultsBlock, _dataSchema);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index ade326ea20..581e2f8f10 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
import java.util.List;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.InvocationScope;
@@ -55,11 +56,19 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
if (shouldCollectStats()) {
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
operatorStats.startTimer();
- nextBlock = getNextBlock();
+ try {
+ nextBlock = getNextBlock();
+ } catch (Exception e) {
+ nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
operatorStats.recordRow(1, nextBlock.getNumRows());
operatorStats.endTimer(nextBlock);
} else {
- nextBlock = getNextBlock();
+ try {
+ nextBlock = getNextBlock();
+ } catch (Exception e) {
+ nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
}
return nextBlock;
}
@@ -70,7 +79,8 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
}
// Make it protected because we should always call nextBlock()
- protected abstract TransferableBlock getNextBlock();
+ protected abstract TransferableBlock getNextBlock()
+ throws Exception;
protected void earlyTerminate() {
_isEarlyTerminated = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 1aacb03290..32fa9f140f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -43,7 +43,6 @@ public class OperatorStats {
private long _startTimeMs = -1;
private long _endTimeMs = -1;
private final Map<String, String> _executionStats;
- private boolean _processingStarted = false;
public OperatorStats(OpChainExecutionContext context) {
this(context.getRequestId(), context.getStageId(), context.getServer());
@@ -69,7 +68,6 @@ public class OperatorStats {
_executeStopwatch.stop();
_endTimeMs = System.currentTimeMillis();
}
- _processingStarted = true;
}
public void recordRow(int numBlock, int numRows) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index 2c45382672..edf8416f02 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -50,6 +50,7 @@ public abstract class SetOperator extends MultiStageOperator {
private final DataSchema _dataSchema;
private boolean _isRightSetBuilt;
+ private boolean _isTerminated;
private TransferableBlock _upstreamErrorBlock;
public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
@@ -60,6 +61,8 @@ public abstract class SetOperator extends MultiStageOperator {
_leftChildOperator = getChildOperators().get(0);
_rightChildOperator = getChildOperators().get(1);
_rightRowSet = new HashSet<>();
+ _isRightSetBuilt = false;
+ _isTerminated = false;
}
@Override
@@ -89,10 +92,17 @@ public abstract class SetOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- // A blocking call to construct a set with all the right side rows.
+ if (_isTerminated) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
if (!_isRightSetBuilt) {
+ // construct a SET with all the right side rows.
constructRightBlockSet();
}
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
+ // UNION each left block with the constructed right block set.
TransferableBlock leftBlock = _leftChildOperator.nextBlock();
return constructResultBlockSet(leftBlock);
}
@@ -107,7 +117,11 @@ public abstract class SetOperator extends MultiStageOperator {
}
block = _rightChildOperator.nextBlock();
}
- _isRightSetBuilt = true;
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ } else {
+ _isRightSetBuilt = true;
+ }
}
protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 64f0926a63..ce4ddf130f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -51,8 +51,7 @@ public class SortOperator extends MultiStageOperator {
private final ArrayList<Object[]> _rows;
private final int _numRowsToKeep;
- private boolean _hasReturnedSortedBlock;
- private TransferableBlock _upstreamErrorBlock;
+ private boolean _hasConstructedSortedBlock;
public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
@@ -73,8 +72,7 @@ public class SortOperator extends MultiStageOperator {
_fetch = fetch;
_offset = Math.max(offset, 0);
_dataSchema = dataSchema;
- _upstreamErrorBlock = null;
- _hasReturnedSortedBlock = false;
+ _hasConstructedSortedBlock = false;
// Setting numRowsToKeep as default maximum on Broker if limit not set.
// TODO: make this default behavior configurable.
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
@@ -110,74 +108,65 @@ public class SortOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- try {
- consumeInputBlocks();
- return produceSortedBlock();
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(e);
+ if (_hasConstructedSortedBlock) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ TransferableBlock finalBlock = consumeInputBlocks();
+ // returning upstream error block if finalBlock contains error.
+ if (finalBlock.isErrorBlock()) {
+ return finalBlock;
}
+ return produceSortedBlock();
}
private TransferableBlock produceSortedBlock() {
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- }
-
- if (!_hasReturnedSortedBlock) {
- _hasReturnedSortedBlock = true;
- if (_priorityQueue == null) {
- if (_rows.size() > _offset) {
- List<Object[]> row = _rows.subList(_offset, _rows.size());
- return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
+ _hasConstructedSortedBlock = true;
+ if (_priorityQueue == null) {
+ if (_rows.size() > _offset) {
+ List<Object[]> row = _rows.subList(_offset, _rows.size());
+ return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
} else {
- LinkedList<Object[]> rows = new LinkedList<>();
- while (_priorityQueue.size() > _offset) {
- Object[] row = _priorityQueue.poll();
- rows.addFirst(row);
- }
- if (rows.size() == 0) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- } else {
- return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
- }
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ LinkedList<Object[]> rows = new LinkedList<>();
+ while (_priorityQueue.size() > _offset) {
+ Object[] row = _priorityQueue.poll();
+ rows.addFirst(row);
+ }
+ if (rows.size() == 0) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ } else {
+ return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+ }
}
}
- private void consumeInputBlocks() {
- if (!_hasReturnedSortedBlock) {
- TransferableBlock block = _upstreamOperator.nextBlock();
- while (block.isDataBlock()) {
- List<Object[]> container = block.getContainer();
- if (_priorityQueue == null) {
- // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
- int numRows = _rows.size();
- if (numRows < _numRowsToKeep) {
- if (numRows + container.size() < _numRowsToKeep) {
- _rows.addAll(container);
- } else {
- _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
- LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
- _context.getId());
- // setting operator to be early terminated and awaits EOS block next.
- earlyTerminate();
- }
- }
- } else {
- for (Object[] row : container) {
- SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
+ private TransferableBlock consumeInputBlocks() {
+ TransferableBlock block = _upstreamOperator.nextBlock();
+ while (block.isDataBlock()) {
+ List<Object[]> container = block.getContainer();
+ if (_priorityQueue == null) {
+ // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
+ int numRows = _rows.size();
+ if (numRows < _numRowsToKeep) {
+ if (numRows + container.size() < _numRowsToKeep) {
+ _rows.addAll(container);
+ } else {
+ _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
+ LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
+ _context.getId());
+ // setting operator to be early terminated and awaits EOS block next.
+ earlyTerminate();
}
}
- block = _upstreamOperator.nextBlock();
- }
- if (block.isErrorBlock()) {
- _upstreamErrorBlock = block;
+ } else {
+ for (Object[] row : container) {
+ SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
+ }
}
+ block = _upstreamOperator.nextBlock();
}
+ return block;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index f769ceef5a..234f32fbcd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -51,7 +50,6 @@ public class TransformOperator extends MultiStageOperator {
private final int _resultColumnSize;
// TODO: Check type matching between resultSchema and the actual result.
private final DataSchema _resultSchema;
- private TransferableBlock _upstreamErrorBlock;
public TransformOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
DataSchema resultSchema, List<RexExpression> transforms, DataSchema upstreamDataSchema) {
@@ -81,28 +79,10 @@ public class TransformOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- try {
- TransferableBlock block = _upstreamOperator.nextBlock();
- return transform(block);
- } catch (RuntimeException e) {
- return TransferableBlockUtils.getErrorTransferableBlock(e);
- }
- }
-
- private TransferableBlock transform(TransferableBlock block) {
- // TODO: Other operators keep the first erroneous block, while this keep the last.
- // We should decide what is what we want to do and be consistent with that.
- if (block.isErrorBlock()) {
- _upstreamErrorBlock = block;
- }
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- }
-
- if (TransferableBlockUtils.isEndOfStream(block)) {
+ TransferableBlock block = _upstreamOperator.nextBlock();
+ if (block.isEndOfStreamBlock()) {
return block;
}
-
List<Object[]> container = block.getContainer();
List<Object[]> resultRows = new ArrayList<>(container.size());
for (Object[] row : container) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index ea9e065416..259abaea1b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -163,21 +163,14 @@ public class WindowAggregateOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- try {
- TransferableBlock finalBlock = consumeInputBlocks();
- if (finalBlock.isErrorBlock()) {
- return finalBlock;
- }
-
- if (!_hasReturnedWindowAggregateBlock) {
- return produceWindowAggregatedBlock();
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- } catch (Exception e) {
- LOGGER.error("Caught exception while executing WindowAggregationOperator, returning an error block", e);
- return TransferableBlockUtils.getErrorTransferableBlock(e);
+ if (_hasReturnedWindowAggregateBlock) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ TransferableBlock finalBlock = consumeInputBlocks();
+ if (finalBlock.isErrorBlock()) {
+ return finalBlock;
}
+ return produceWindowAggregatedBlock();
}
private void validateAggregationCalls(String functionName,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org