You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/07 15:27:12 UTC
[pinot] branch master updated: [multistage] consolidate operator stats (#10235)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c6b7a9ef95 [multistage] consolidate operator stats (#10235)
c6b7a9ef95 is described below
commit c6b7a9ef95c188369defa8f2abfceb177be3bf06
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Feb 7 07:27:03 2023 -0800
[multistage] consolidate operator stats (#10235)
Details:
- all logging is done in MultistageOperator base class wrapped in a new nextBlock() method; logged once per operator per server instance.
- removed input/output row, input row is just the output row of the upstream operators
- removed the inserted start/stop in actual operator impl, so time will include the targetted operator and its upstreams
- made it much easier to sort and find individual request/stage stats as they are lump together.
Follow up:
- split time between upstream and current operator: we don't have input and output and no split between operators so everything is a chained
- do reporting all at the broker level and consolidate: right now it logs everywhere in server not easy to track
- do default opChain level reporting instead of operator level: operator level reporting should be only enabled via Trace
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../query/runtime/operator/AggregateOperator.java | 17 +---
.../query/runtime/operator/FilterOperator.java | 14 +--
.../query/runtime/operator/HashJoinOperator.java | 20 +----
.../LeafStageTransferableBlockOperator.java | 45 ++++------
.../runtime/operator/LiteralValueOperator.java | 22 ++---
.../runtime/operator/MailboxReceiveOperator.java | 99 ++++++++++------------
.../runtime/operator/MailboxSendOperator.java | 15 +---
.../query/runtime/operator/MultiStageOperator.java | 35 +++++++-
.../query/runtime/operator/OperatorStats.java | 25 ++----
.../pinot/query/runtime/operator/SortOperator.java | 15 +---
.../query/runtime/operator/TransformOperator.java | 12 +--
11 files changed, 113 insertions(+), 206 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 90e00a943a..7dc377b21b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -76,9 +76,6 @@ public class AggregateOperator extends MultiStageOperator {
private boolean _readyToConstruct;
private boolean _hasReturnedAggregateBlock;
- // TODO: Move to OperatorContext class.
- private OperatorStats _operatorStats;
-
// TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
// aggCalls has to be a list of FunctionCall and cannot be null
// groupSet has to be a list of InputRef and cannot be null
@@ -93,6 +90,7 @@ public class AggregateOperator extends MultiStageOperator {
AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
List<RexExpression> groupSet, DataSchema inputSchema,
Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId) {
+ super(requestId, stageId);
_inputOperator = inputOperator;
_groupSet = groupSet;
_upstreamErrorBlock = null;
@@ -114,7 +112,6 @@ public class AggregateOperator extends MultiStageOperator {
_resultSchema = dataSchema;
_readyToConstruct = false;
_hasReturnedAggregateBlock = false;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -125,15 +122,11 @@ public class AggregateOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- // TODO: move to close call;
- _inputOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
try {
if (!_readyToConstruct && !consumeInputBlocks()) {
return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -151,8 +144,6 @@ public class AggregateOperator extends MultiStageOperator {
}
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
- } finally {
- _operatorStats.endTimer();
}
}
@@ -175,7 +166,6 @@ public class AggregateOperator extends MultiStageOperator {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} else {
- _operatorStats.recordOutput(1, rows.size());
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
}
@@ -195,9 +185,7 @@ public class AggregateOperator extends MultiStageOperator {
* @return whether or not the operator is ready to move on (EOS or ERROR)
*/
private boolean consumeInputBlocks() {
- _operatorStats.endTimer();
TransferableBlock block = _inputOperator.nextBlock();
- _operatorStats.startTimer();
while (!block.isNoOpBlock()) {
// setting upstream error block
if (block.isErrorBlock()) {
@@ -216,10 +204,7 @@ public class AggregateOperator extends MultiStageOperator {
_accumulators[i].accumulate(key, row);
}
}
- _operatorStats.recordInput(1, container.size());
- _operatorStats.endTimer();
block = _inputOperator.nextBlock();
- _operatorStats.startTimer();
}
return false;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 6f57ece6df..ba6f64f19e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -54,16 +54,13 @@ public class FilterOperator extends MultiStageOperator {
private final DataSchema _dataSchema;
private TransferableBlock _upstreamErrorBlock;
- // TODO: Move to OperatorContext class.
- private OperatorStats _operatorStats;
-
public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter,
long requestId, int stageId) {
+ super(requestId, stageId);
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
_filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
_upstreamErrorBlock = null;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -74,23 +71,16 @@ public class FilterOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- _upstreamOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
try {
- _operatorStats.endTimer();
TransferableBlock block = _upstreamOperator.nextBlock();
- _operatorStats.startTimer();
return transform(block);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
- } finally {
- _operatorStats.endTimer();
}
}
@@ -113,8 +103,6 @@ public class FilterOperator extends MultiStageOperator {
resultRows.add(row);
}
}
- _operatorStats.recordInput(1, container.size());
- _operatorStats.recordOutput(1, resultRows.size());
return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index b4e88965cb..e5cb99c275 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -87,10 +87,9 @@ public class HashJoinOperator extends MultiStageOperator {
private KeySelector<Object[], Object[]> _leftKeySelector;
private KeySelector<Object[], Object[]> _rightKeySelector;
- private OperatorStats _operatorStats;
-
public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator,
DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
+ super(requestId, stageId);
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
"Join type: " + node.getJoinRelType() + " is not supported!");
_joinType = node.getJoinRelType();
@@ -118,7 +117,6 @@ public class HashJoinOperator extends MultiStageOperator {
_matchedRightRows = null;
}
_upstreamErrorBlock = null;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
// TODO: Separate left and right table operator.
@@ -130,15 +128,11 @@ public class HashJoinOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- _leftTableOperator.toExplainString();
- _rightTableOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
try {
if (_isTerminated) {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -152,22 +146,16 @@ public class HashJoinOperator extends MultiStageOperator {
} else if (!_isHashTableBuilt) {
return TransferableBlockUtils.getNoOpTransferableBlock();
}
- _operatorStats.endTimer();
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
- _operatorStats.startTimer();
// JOIN each left block with the right block.
return buildJoinedDataBlock(leftBlock);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
- } finally {
- _operatorStats.endTimer();
}
}
private void buildBroadcastHashTable() {
- _operatorStats.endTimer();
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
- _operatorStats.startTimer();
while (!rightBlock.isNoOpBlock()) {
if (rightBlock.isErrorBlock()) {
_upstreamErrorBlock = rightBlock;
@@ -184,10 +172,7 @@ public class HashJoinOperator extends MultiStageOperator {
_broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
hashCollection.add(row);
}
- _operatorStats.recordInput(1, container.size());
- _operatorStats.endTimer();
rightBlock = _rightTableOperator.nextBlock();
- _operatorStats.startTimer();
}
}
@@ -217,7 +202,6 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
_isTerminated = true;
- _operatorStats.recordOutput(1, returnRows.size());
return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW);
}
List<Object[]> rows = new ArrayList<>();
@@ -252,8 +236,6 @@ public class HashJoinOperator extends MultiStageOperator {
rows.add(joinRow(leftRow, null));
}
}
- _operatorStats.recordInput(1, container.size());
- _operatorStats.recordOutput(1, rows.size());
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index baf7373a07..43b0eaccd9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -66,16 +66,13 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
private final DataSchema _desiredDataSchema;
private int _currentIndex;
- // TODO: Move to OperatorContext class.
- private OperatorStats _operatorStats;
-
public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema,
long requestId, int stageId) {
+ super(requestId, stageId);
_baseResultBlock = baseResultBlock;
_desiredDataSchema = dataSchema;
_errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
_currentIndex = 0;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -86,39 +83,29 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- try {
- _operatorStats.startTimer();
- if (_currentIndex < 0) {
- throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
- }
- if (_errorBlock != null) {
- _currentIndex = -1;
- return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
- } else {
- if (_currentIndex < _baseResultBlock.size()) {
- InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
- if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
- _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows());
- _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows());
- return composeTransferableBlock(responseBlock, _desiredDataSchema);
- } else {
- _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows());
- _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows());
- return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
- }
+ if (_currentIndex < 0) {
+ throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
+ }
+ if (_errorBlock != null) {
+ _currentIndex = -1;
+ return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
+ } else {
+ if (_currentIndex < _baseResultBlock.size()) {
+ InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
+ if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
+ return composeTransferableBlock(responseBlock, _desiredDataSchema);
} else {
- _currentIndex = -1;
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
}
+ } else {
+ _currentIndex = -1;
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
- } finally {
- _operatorStats.endTimer();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 8fc160f205..6e123dd6b1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -39,12 +39,10 @@ public class LiteralValueOperator extends MultiStageOperator {
private final TransferableBlock _rexLiteralBlock;
private boolean _isLiteralBlockReturned;
- private OperatorStats _operatorStats;
-
public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows,
long requestId, int stageId) {
+ super(requestId, stageId);
_dataSchema = dataSchema;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
_rexLiteralBlock = constructBlock(rexLiteralRows);
_isLiteralBlockReturned = false;
}
@@ -57,22 +55,16 @@ public class LiteralValueOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- try {
- _operatorStats.startTimer();
- if (!_isLiteralBlockReturned) {
- _isLiteralBlockReturned = true;
- return _rexLiteralBlock;
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- } finally {
- _operatorStats.endTimer();
+ if (!_isLiteralBlockReturned) {
+ _isLiteralBlockReturned = true;
+ return _rexLiteralBlock;
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
}
@@ -85,8 +77,6 @@ public class LiteralValueOperator extends MultiStageOperator {
}
blockContent.add(row);
}
- _operatorStats.recordInput(1, blockContent.size());
- _operatorStats.recordOutput(1, blockContent.size());
return new TransferableBlock(blockContent, _dataSchema, DataBlock.Type.ROW);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad0dcdbde5..2423c187c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -66,7 +66,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
private final long _deadlineTimestampNano;
private int _serverIdx;
private TransferableBlock _upstreamErrorBlock;
- private OperatorStats _operatorStats;
private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, long stageId,
VirtualServerAddress receiver) {
@@ -80,6 +79,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
long jobId, int stageId, Long timeoutMs) {
+ super(jobId, stageId);
_mailboxService = mailboxService;
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
"Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -113,7 +113,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
}
_upstreamErrorBlock = null;
_serverIdx = 0;
- _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
}
public List<MailboxIdentifier> getSendingMailbox() {
@@ -128,68 +127,60 @@ public class MailboxReceiveOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- try {
- _operatorStats.startTimer();
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- } else if (System.nanoTime() >= _deadlineTimestampNano) {
- return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- }
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ } else if (System.nanoTime() >= _deadlineTimestampNano) {
+ return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+ }
- int startingIdx = _serverIdx;
- int openMailboxCount = 0;
- int eosMailboxCount = 0;
-
- // For all non-singleton distribution, we poll from every instance to check mailbox content.
- // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
- for (int i = 0; i < _sendingMailbox.size(); i++) {
- // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
- _serverIdx = (startingIdx + i) % _sendingMailbox.size();
- MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
- try {
- ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
- if (!mailbox.isClosed()) {
- openMailboxCount++;
- TransferableBlock block = mailbox.receive();
- // Get null block when pulling times out from mailbox.
- if (block != null) {
- if (block.isErrorBlock()) {
- _upstreamErrorBlock =
- TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
- return _upstreamErrorBlock;
- }
- if (!block.isEndOfStreamBlock()) {
- _operatorStats.recordInput(1, block.getNumRows());
- _operatorStats.recordOutput(1, block.getNumRows());
- return block;
- } else {
- eosMailboxCount++;
- }
+ int startingIdx = _serverIdx;
+ int openMailboxCount = 0;
+ int eosMailboxCount = 0;
+
+ // For all non-singleton distribution, we poll from every instance to check mailbox content.
+ // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+ for (int i = 0; i < _sendingMailbox.size(); i++) {
+ // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+ _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+ MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+ try {
+ ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
+ if (!mailbox.isClosed()) {
+ openMailboxCount++;
+ TransferableBlock block = mailbox.receive();
+ // Get null block when pulling times out from mailbox.
+ if (block != null) {
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock =
+ TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+ return _upstreamErrorBlock;
+ }
+ if (!block.isEndOfStreamBlock()) {
+ return block;
+ } else {
+ eosMailboxCount++;
}
}
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(
- new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
}
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
}
-
- // there are two conditions in which we should return EOS: (1) there were
- // no mailboxes to open (this shouldn't happen because the second condition
- // should be hit first, but is defensive) (2) every mailbox that was opened
- // returned an EOS block. in every other scenario, there are mailboxes that
- // are not yet exhausted and we should wait for more data to be available
- TransferableBlock block =
- openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
- : TransferableBlockUtils.getEndOfStreamTransferableBlock();
- return block;
- } finally {
- _operatorStats.endTimer();
}
+
+ // there are two conditions in which we should return EOS: (1) there were
+ // no mailboxes to open (this shouldn't happen because the second condition
+ // should be hit first, but is defensive) (2) every mailbox that was opened
+ // returned an EOS block. in every other scenario, there are mailboxes that
+ // are not yet exhausted and we should wait for more data to be available
+ TransferableBlock block =
+ openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
+ : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ return block;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 177926e0f3..828da0375b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -55,7 +55,6 @@ public class MailboxSendOperator extends MultiStageOperator {
private final MultiStageOperator _dataTableBlockBaseOperator;
private final BlockExchange _exchange;
- private OperatorStats _operatorStats;
@VisibleForTesting
interface BlockExchangeFactory {
@@ -81,6 +80,7 @@ public class MailboxSendOperator extends MultiStageOperator {
MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int stageId) {
+ super(jobId, stageId);
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
List<MailboxIdentifier> receivingMailboxes;
@@ -113,7 +113,6 @@ public class MailboxSendOperator extends MultiStageOperator {
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
String.format("Exchange type '%s' is not supported yet", exchangeType));
- _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
}
@Override
@@ -124,30 +123,20 @@ public class MailboxSendOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- _dataTableBlockBaseOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
TransferableBlock transferableBlock;
try {
- _operatorStats.endTimer();
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
- _operatorStats.startTimer();
while (!transferableBlock.isNoOpBlock()) {
_exchange.send(transferableBlock);
- _operatorStats.recordInput(1, transferableBlock.getNumRows());
- // The # of output block is not accurate because we may do a split in exchange send.
- _operatorStats.recordOutput(1, transferableBlock.getNumRows());
if (transferableBlock.isEndOfStreamBlock()) {
return transferableBlock;
}
- _operatorStats.endTimer();
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
- _operatorStats.startTimer();
}
} catch (final Exception e) {
// ideally, MailboxSendOperator doesn't ever throw an exception because
@@ -159,8 +148,6 @@ public class MailboxSendOperator extends MultiStageOperator {
} catch (Exception e2) {
LOGGER.error("Exception while sending block to mailbox.", e2);
}
- } finally {
- _operatorStats.endTimer();
}
return transferableBlock;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 37e954e6ae..e48d39e788 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -19,14 +19,45 @@
package org.apache.pinot.query.runtime.operator;
import java.util.List;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.trace.InvocationScope;
+import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.LoggerFactory;
-public abstract class MultiStageOperator extends BaseOperator<TransferableBlock> implements AutoCloseable {
+public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
+ // TODO: Move to OperatorContext class.
+ private final OperatorStats _operatorStats;
+
+ public MultiStageOperator(long requestId, int stageId) {
+ _operatorStats = new OperatorStats(requestId, stageId, toExplainString());
+ }
+
+ @Override
+ public TransferableBlock nextBlock() {
+ if (Tracing.ThreadAccountantOps.isInterrupted()) {
+ throw new EarlyTerminationException("Interrupted while processing next block");
+ }
+ try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
+ _operatorStats.startTimer();
+ TransferableBlock nextBlock = getNextBlock();
+ _operatorStats.recordRow(1, nextBlock.getNumRows());
+ _operatorStats.endTimer();
+ // TODO: move this to centralized reporting in broker
+ if (nextBlock.isEndOfStreamBlock()) {
+ LOGGER.info("Recorded operator stats: " + _operatorStats);
+ }
+ return nextBlock;
+ }
+ }
+
+ // Make it protected because we should always call nextBlock()
+ protected abstract TransferableBlock getNextBlock();
+
@Override
public List<MultiStageOperator> getChildOperators() {
throw new UnsupportedOperationException();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index c40b96b3c8..7fbcc5b035 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -31,12 +31,8 @@ public class OperatorStats {
private final String _operatorType;
- private int _numInputBlock = 0;
- private int _numInputRows = 0;
-
- private int _numOutputBlock = 0;
-
- private int _numOutputRows = 0;
+ private int _numBlock = 0;
+ private int _numRows = 0;
public OperatorStats(long requestId, int stageId, String operatorType) {
_stageId = stageId;
@@ -56,23 +52,16 @@ public class OperatorStats {
}
}
- public void recordInput(int numBlock, int numRows) {
- _numInputBlock += numBlock;
- _numInputRows += numRows;
- }
-
- public void recordOutput(int numBlock, int numRows) {
- _numOutputBlock += numBlock;
- _numOutputRows += numRows;
+ public void recordRow(int numBlock, int numRows) {
+ _numBlock += numBlock;
+ _numRows += numRows;
}
// TODO: Return the string as a JSON string.
@Override
public String toString() {
return String.format(
- "OperatorStats[type: %s, requestId: %s, stageId %s] ExecutionWallTime: %sms, InputRows: %s, InputBlock: "
- + "%s, OutputRows: %s, OutputBlock: %s", _operatorType, _requestId, _stageId,
- _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numInputRows, _numInputBlock, _numOutputRows,
- _numOutputBlock);
+ "OperatorStats[requestId: %s, stageId %s, type: %s] ExecutionWallTime: %sms, No. Rows: %s, No. Block: %s",
+ _requestId, _stageId, _operatorType, _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numRows, _numBlock);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 9494280e0b..e24195948a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -50,7 +50,6 @@ public class SortOperator extends MultiStageOperator {
private boolean _readyToConstruct;
private boolean _isSortedBlockConstructed;
private TransferableBlock _upstreamErrorBlock;
- private OperatorStats _operatorStats;
public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
@@ -63,6 +62,7 @@ public class SortOperator extends MultiStageOperator {
SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
int defaultHolderCapacity, long requestId, int stageId) {
+ super(requestId, stageId);
_upstreamOperator = upstreamOperator;
_fetch = fetch;
_offset = Math.max(offset, 0);
@@ -72,7 +72,6 @@ public class SortOperator extends MultiStageOperator {
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
_rows = new PriorityQueue<>(_numRowsToKeep,
new SortComparator(collationKeys, collationDirections, dataSchema, false));
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -87,27 +86,21 @@ public class SortOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- _upstreamOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
try {
consumeInputBlocks();
return produceSortedBlock();
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
- } finally {
- _operatorStats.endTimer();
}
}
private TransferableBlock produceSortedBlock() {
if (_upstreamErrorBlock != null) {
- LOGGER.error("OperatorStats:" + _operatorStats);
return _upstreamErrorBlock;
} else if (!_readyToConstruct) {
return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -119,7 +112,6 @@ public class SortOperator extends MultiStageOperator {
Object[] row = _rows.poll();
rows.addFirst(row);
}
- _operatorStats.recordOutput(1, rows.size());
_isSortedBlockConstructed = true;
if (rows.size() == 0) {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -133,9 +125,7 @@ public class SortOperator extends MultiStageOperator {
private void consumeInputBlocks() {
if (!_isSortedBlockConstructed) {
- _operatorStats.endTimer();
TransferableBlock block = _upstreamOperator.nextBlock();
- _operatorStats.startTimer();
while (!block.isNoOpBlock()) {
// setting upstream error block
if (block.isErrorBlock()) {
@@ -150,10 +140,7 @@ public class SortOperator extends MultiStageOperator {
for (Object[] row : container) {
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
- _operatorStats.endTimer();
block = _upstreamOperator.nextBlock();
- _operatorStats.startTimer();
- _operatorStats.recordInput(1, container.size());
}
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index a75ad6b017..82f804a285 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -53,10 +53,10 @@ public class TransformOperator extends MultiStageOperator {
// TODO: Check type matching between resultSchema and the actual result.
private final DataSchema _resultSchema;
private TransferableBlock _upstreamErrorBlock;
- private OperatorStats _operatorStats;
public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema,
List<RexExpression> transforms, DataSchema upstreamDataSchema, long requestId, int stageId) {
+ super(requestId, stageId);
Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty.");
Preconditions.checkState(resultSchema.size() == transforms.size(),
"result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size());
@@ -67,7 +67,6 @@ public class TransformOperator extends MultiStageOperator {
_transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, upstreamDataSchema));
}
_resultSchema = resultSchema;
- _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -78,23 +77,16 @@ public class TransformOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- _upstreamOperator.toExplainString();
- LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- _operatorStats.startTimer();
try {
- _operatorStats.endTimer();
TransferableBlock block = _upstreamOperator.nextBlock();
- _operatorStats.startTimer();
return transform(block);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
- } finally {
- _operatorStats.endTimer();
}
}
@@ -121,8 +113,6 @@ public class TransformOperator extends MultiStageOperator {
}
resultRows.add(resultRow);
}
- _operatorStats.recordInput(1, container.size());
- _operatorStats.recordOutput(1, resultRows.size());
return new TransferableBlock(resultRows, _resultSchema, DataBlock.Type.ROW);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org