You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/07 15:27:12 UTC

[pinot] branch master updated: [multistage] consolidate operator stats (#10235)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b7a9ef95 [multistage] consolidate operator stats (#10235)
c6b7a9ef95 is described below

commit c6b7a9ef95c188369defa8f2abfceb177be3bf06
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Feb 7 07:27:03 2023 -0800

    [multistage] consolidate operator stats (#10235)
    
    Details:
    - all logging is done in MultistageOperator base class wrapped in a new nextBlock() method; logged once per operator per server instance.
    - removed input/output row, input row is just the output row of the upstream operators
    - removed the inserted start/stop in actual operator impl, so time will include the targetted operator and its upstreams
    - made it much easier to sort and find individual request/stage stats as they are lump together.
    
    Follow up:
    - split time between upstream and current operator: we don't have input and output and no split between operators so everything is a chained
    - do reporting all at the broker level and consolidate: right now it logs everywhere in server not easy to track
    - do default opChain level reporting instead of operator level: operator level reporting should be only enabled via Trace
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../query/runtime/operator/AggregateOperator.java  | 17 +---
 .../query/runtime/operator/FilterOperator.java     | 14 +--
 .../query/runtime/operator/HashJoinOperator.java   | 20 +----
 .../LeafStageTransferableBlockOperator.java        | 45 ++++------
 .../runtime/operator/LiteralValueOperator.java     | 22 ++---
 .../runtime/operator/MailboxReceiveOperator.java   | 99 ++++++++++------------
 .../runtime/operator/MailboxSendOperator.java      | 15 +---
 .../query/runtime/operator/MultiStageOperator.java | 35 +++++++-
 .../query/runtime/operator/OperatorStats.java      | 25 ++----
 .../pinot/query/runtime/operator/SortOperator.java | 15 +---
 .../query/runtime/operator/TransformOperator.java  | 12 +--
 11 files changed, 113 insertions(+), 206 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 90e00a943a..7dc377b21b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -76,9 +76,6 @@ public class AggregateOperator extends MultiStageOperator {
   private boolean _readyToConstruct;
   private boolean _hasReturnedAggregateBlock;
 
-  // TODO: Move to OperatorContext class.
-  private OperatorStats _operatorStats;
-
   // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
   // aggCalls has to be a list of FunctionCall and cannot be null
   // groupSet has to be a list of InputRef and cannot be null
@@ -93,6 +90,7 @@ public class AggregateOperator extends MultiStageOperator {
   AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
       List<RexExpression> groupSet, DataSchema inputSchema,
       Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId) {
+    super(requestId, stageId);
     _inputOperator = inputOperator;
     _groupSet = groupSet;
     _upstreamErrorBlock = null;
@@ -114,7 +112,6 @@ public class AggregateOperator extends MultiStageOperator {
     _resultSchema = dataSchema;
     _readyToConstruct = false;
     _hasReturnedAggregateBlock = false;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -125,15 +122,11 @@ public class AggregateOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    // TODO: move to close call;
-    _inputOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     try {
       if (!_readyToConstruct && !consumeInputBlocks()) {
         return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -151,8 +144,6 @@ public class AggregateOperator extends MultiStageOperator {
       }
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
@@ -175,7 +166,6 @@ public class AggregateOperator extends MultiStageOperator {
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } else {
-      _operatorStats.recordOutput(1, rows.size());
       return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
     }
   }
@@ -195,9 +185,7 @@ public class AggregateOperator extends MultiStageOperator {
    * @return whether or not the operator is ready to move on (EOS or ERROR)
    */
   private boolean consumeInputBlocks() {
-    _operatorStats.endTimer();
     TransferableBlock block = _inputOperator.nextBlock();
-    _operatorStats.startTimer();
     while (!block.isNoOpBlock()) {
       // setting upstream error block
       if (block.isErrorBlock()) {
@@ -216,10 +204,7 @@ public class AggregateOperator extends MultiStageOperator {
           _accumulators[i].accumulate(key, row);
         }
       }
-      _operatorStats.recordInput(1, container.size());
-      _operatorStats.endTimer();
       block = _inputOperator.nextBlock();
-      _operatorStats.startTimer();
     }
     return false;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 6f57ece6df..ba6f64f19e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -54,16 +54,13 @@ public class FilterOperator extends MultiStageOperator {
   private final DataSchema _dataSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  // TODO: Move to OperatorContext class.
-  private OperatorStats _operatorStats;
-
   public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter,
       long requestId, int stageId) {
+    super(requestId, stageId);
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
     _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
     _upstreamErrorBlock = null;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -74,23 +71,16 @@ public class FilterOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    _upstreamOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     try {
-      _operatorStats.endTimer();
       TransferableBlock block = _upstreamOperator.nextBlock();
-      _operatorStats.startTimer();
       return transform(block);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
@@ -113,8 +103,6 @@ public class FilterOperator extends MultiStageOperator {
         resultRows.add(row);
       }
     }
-    _operatorStats.recordInput(1, container.size());
-    _operatorStats.recordOutput(1, resultRows.size());
     return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index b4e88965cb..e5cb99c275 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -87,10 +87,9 @@ public class HashJoinOperator extends MultiStageOperator {
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
-  private OperatorStats _operatorStats;
-
   public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator,
       DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
+    super(requestId, stageId);
     Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
         "Join type: " + node.getJoinRelType() + " is not supported!");
     _joinType = node.getJoinRelType();
@@ -118,7 +117,6 @@ public class HashJoinOperator extends MultiStageOperator {
       _matchedRightRows = null;
     }
     _upstreamErrorBlock = null;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   // TODO: Separate left and right table operator.
@@ -130,15 +128,11 @@ public class HashJoinOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    _leftTableOperator.toExplainString();
-    _rightTableOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     try {
       if (_isTerminated) {
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -152,22 +146,16 @@ public class HashJoinOperator extends MultiStageOperator {
       } else if (!_isHashTableBuilt) {
         return TransferableBlockUtils.getNoOpTransferableBlock();
       }
-      _operatorStats.endTimer();
       TransferableBlock leftBlock = _leftTableOperator.nextBlock();
-      _operatorStats.startTimer();
       // JOIN each left block with the right block.
       return buildJoinedDataBlock(leftBlock);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
   private void buildBroadcastHashTable() {
-    _operatorStats.endTimer();
     TransferableBlock rightBlock = _rightTableOperator.nextBlock();
-    _operatorStats.startTimer();
     while (!rightBlock.isNoOpBlock()) {
       if (rightBlock.isErrorBlock()) {
         _upstreamErrorBlock = rightBlock;
@@ -184,10 +172,7 @@ public class HashJoinOperator extends MultiStageOperator {
             _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
         hashCollection.add(row);
       }
-      _operatorStats.recordInput(1, container.size());
-      _operatorStats.endTimer();
       rightBlock = _rightTableOperator.nextBlock();
-      _operatorStats.startTimer();
     }
   }
 
@@ -217,7 +202,6 @@ public class HashJoinOperator extends MultiStageOperator {
         }
       }
       _isTerminated = true;
-      _operatorStats.recordOutput(1, returnRows.size());
       return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW);
     }
     List<Object[]> rows = new ArrayList<>();
@@ -252,8 +236,6 @@ public class HashJoinOperator extends MultiStageOperator {
         rows.add(joinRow(leftRow, null));
       }
     }
-    _operatorStats.recordInput(1, container.size());
-    _operatorStats.recordOutput(1, rows.size());
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index baf7373a07..43b0eaccd9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -66,16 +66,13 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   private final DataSchema _desiredDataSchema;
   private int _currentIndex;
 
-  // TODO: Move to OperatorContext class.
-  private OperatorStats _operatorStats;
-
   public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema,
       long requestId, int stageId) {
+    super(requestId, stageId);
     _baseResultBlock = baseResultBlock;
     _desiredDataSchema = dataSchema;
     _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
     _currentIndex = 0;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -86,39 +83,29 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      _operatorStats.startTimer();
-      if (_currentIndex < 0) {
-        throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
-      }
-      if (_errorBlock != null) {
-        _currentIndex = -1;
-        return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
-      } else {
-        if (_currentIndex < _baseResultBlock.size()) {
-          InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
-          if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
-            _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows());
-            _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows());
-            return composeTransferableBlock(responseBlock, _desiredDataSchema);
-          } else {
-            _operatorStats.recordInput(1, responseBlock.getResultsBlock().getNumRows());
-            _operatorStats.recordOutput(1, responseBlock.getResultsBlock().getNumRows());
-            return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
-          }
+    if (_currentIndex < 0) {
+      throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
+    }
+    if (_errorBlock != null) {
+      _currentIndex = -1;
+      return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
+    } else {
+      if (_currentIndex < _baseResultBlock.size()) {
+        InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
+        if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
+          return composeTransferableBlock(responseBlock, _desiredDataSchema);
         } else {
-          _currentIndex = -1;
-          return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+          return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
         }
+      } else {
+        _currentIndex = -1;
+        return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
       }
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 8fc160f205..6e123dd6b1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -39,12 +39,10 @@ public class LiteralValueOperator extends MultiStageOperator {
   private final TransferableBlock _rexLiteralBlock;
   private boolean _isLiteralBlockReturned;
 
-  private OperatorStats _operatorStats;
-
   public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows,
       long requestId, int stageId) {
+    super(requestId, stageId);
     _dataSchema = dataSchema;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
     _rexLiteralBlock = constructBlock(rexLiteralRows);
     _isLiteralBlockReturned = false;
   }
@@ -57,22 +55,16 @@ public class LiteralValueOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      _operatorStats.startTimer();
-      if (!_isLiteralBlockReturned) {
-        _isLiteralBlockReturned = true;
-        return _rexLiteralBlock;
-      } else {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      }
-    } finally {
-      _operatorStats.endTimer();
+    if (!_isLiteralBlockReturned) {
+      _isLiteralBlockReturned = true;
+      return _rexLiteralBlock;
+    } else {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     }
   }
 
@@ -85,8 +77,6 @@ public class LiteralValueOperator extends MultiStageOperator {
       }
       blockContent.add(row);
     }
-    _operatorStats.recordInput(1, blockContent.size());
-    _operatorStats.recordOutput(1, blockContent.size());
     return new TransferableBlock(blockContent, _dataSchema, DataBlock.Type.ROW);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad0dcdbde5..2423c187c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -66,7 +66,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private final long _deadlineTimestampNano;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
-  private OperatorStats _operatorStats;
 
   private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, long stageId,
       VirtualServerAddress receiver) {
@@ -80,6 +79,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
       List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
       long jobId, int stageId, Long timeoutMs) {
+    super(jobId, stageId);
     _mailboxService = mailboxService;
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -113,7 +113,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
     }
     _upstreamErrorBlock = null;
     _serverIdx = 0;
-    _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
   }
 
   public List<MailboxIdentifier> getSendingMailbox() {
@@ -128,68 +127,60 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      _operatorStats.startTimer();
-      if (_upstreamErrorBlock != null) {
-        return _upstreamErrorBlock;
-      } else if (System.nanoTime() >= _deadlineTimestampNano) {
-        return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-      }
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _deadlineTimestampNano) {
+      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+    }
 
-      int startingIdx = _serverIdx;
-      int openMailboxCount = 0;
-      int eosMailboxCount = 0;
-
-      // For all non-singleton distribution, we poll from every instance to check mailbox content.
-      // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
-      for (int i = 0; i < _sendingMailbox.size(); i++) {
-        // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
-        _serverIdx = (startingIdx + i) % _sendingMailbox.size();
-        MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
-        try {
-          ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
-          if (!mailbox.isClosed()) {
-            openMailboxCount++;
-            TransferableBlock block = mailbox.receive();
-            // Get null block when pulling times out from mailbox.
-            if (block != null) {
-              if (block.isErrorBlock()) {
-                _upstreamErrorBlock =
-                    TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
-                return _upstreamErrorBlock;
-              }
-              if (!block.isEndOfStreamBlock()) {
-                _operatorStats.recordInput(1, block.getNumRows());
-                _operatorStats.recordOutput(1, block.getNumRows());
-                return block;
-              } else {
-                eosMailboxCount++;
-              }
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosMailboxCount = 0;
+
+    // For all non-singleton distribution, we poll from every instance to check mailbox content.
+    // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+    for (int i = 0; i < _sendingMailbox.size(); i++) {
+      // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+      MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+          TransferableBlock block = mailbox.receive();
+          // Get null block when pulling times out from mailbox.
+          if (block != null) {
+            if (block.isErrorBlock()) {
+              _upstreamErrorBlock =
+                  TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+              return _upstreamErrorBlock;
+            }
+            if (!block.isEndOfStreamBlock()) {
+              return block;
+            } else {
+              eosMailboxCount++;
             }
           }
-        } catch (Exception e) {
-          return TransferableBlockUtils.getErrorTransferableBlock(
-              new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
         }
+      } catch (Exception e) {
+        return TransferableBlockUtils.getErrorTransferableBlock(
+            new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
       }
-
-      // there are two conditions in which we should return EOS: (1) there were
-      // no mailboxes to open (this shouldn't happen because the second condition
-      // should be hit first, but is defensive) (2) every mailbox that was opened
-      // returned an EOS block. in every other scenario, there are mailboxes that
-      // are not yet exhausted and we should wait for more data to be available
-      TransferableBlock block =
-          openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
-              : TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      return block;
-    } finally {
-      _operatorStats.endTimer();
     }
+
+    // there are two conditions in which we should return EOS: (1) there were
+    // no mailboxes to open (this shouldn't happen because the second condition
+    // should be hit first, but is defensive) (2) every mailbox that was opened
+    // returned an EOS block. in every other scenario, there are mailboxes that
+    // are not yet exhausted and we should wait for more data to be available
+    TransferableBlock block =
+        openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
+            : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    return block;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 177926e0f3..828da0375b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -55,7 +55,6 @@ public class MailboxSendOperator extends MultiStageOperator {
 
   private final MultiStageOperator _dataTableBlockBaseOperator;
   private final BlockExchange _exchange;
-  private OperatorStats _operatorStats;
 
   @VisibleForTesting
   interface BlockExchangeFactory {
@@ -81,6 +80,7 @@ public class MailboxSendOperator extends MultiStageOperator {
       MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int stageId) {
+    super(jobId, stageId);
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
 
     List<MailboxIdentifier> receivingMailboxes;
@@ -113,7 +113,6 @@ public class MailboxSendOperator extends MultiStageOperator {
 
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
         String.format("Exchange type '%s' is not supported yet", exchangeType));
-    _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -124,30 +123,20 @@ public class MailboxSendOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    _dataTableBlockBaseOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     TransferableBlock transferableBlock;
     try {
-      _operatorStats.endTimer();
       transferableBlock = _dataTableBlockBaseOperator.nextBlock();
-      _operatorStats.startTimer();
       while (!transferableBlock.isNoOpBlock()) {
         _exchange.send(transferableBlock);
-        _operatorStats.recordInput(1, transferableBlock.getNumRows());
-        // The # of output block is not accurate because we may do a split in exchange send.
-        _operatorStats.recordOutput(1, transferableBlock.getNumRows());
         if (transferableBlock.isEndOfStreamBlock()) {
           return transferableBlock;
         }
-        _operatorStats.endTimer();
         transferableBlock = _dataTableBlockBaseOperator.nextBlock();
-        _operatorStats.startTimer();
       }
     } catch (final Exception e) {
       // ideally, MailboxSendOperator doesn't ever throw an exception because
@@ -159,8 +148,6 @@ public class MailboxSendOperator extends MultiStageOperator {
       } catch (Exception e2) {
         LOGGER.error("Exception while sending block to mailbox.", e2);
       }
-    } finally {
-      _operatorStats.endTimer();
     }
     return transferableBlock;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 37e954e6ae..e48d39e788 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -19,14 +19,45 @@
 package org.apache.pinot.query.runtime.operator;
 
 import java.util.List;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.trace.InvocationScope;
+import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.LoggerFactory;
 
 
-public abstract class MultiStageOperator extends BaseOperator<TransferableBlock> implements AutoCloseable {
+public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable {
   private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
 
+  // TODO: Move to OperatorContext class.
+  private final OperatorStats _operatorStats;
+
+  public MultiStageOperator(long requestId, int stageId) {
+    _operatorStats = new OperatorStats(requestId, stageId, toExplainString());
+  }
+
+  @Override
+  public TransferableBlock nextBlock() {
+    if (Tracing.ThreadAccountantOps.isInterrupted()) {
+      throw new EarlyTerminationException("Interrupted while processing next block");
+    }
+    try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
+      _operatorStats.startTimer();
+      TransferableBlock nextBlock = getNextBlock();
+      _operatorStats.recordRow(1, nextBlock.getNumRows());
+      _operatorStats.endTimer();
+      // TODO: move this to centralized reporting in broker
+      if (nextBlock.isEndOfStreamBlock()) {
+        LOGGER.info("Recorded operator stats: " + _operatorStats);
+      }
+      return nextBlock;
+    }
+  }
+
+  // Make it protected because we should always call nextBlock()
+  protected abstract TransferableBlock getNextBlock();
+
   @Override
   public List<MultiStageOperator> getChildOperators() {
     throw new UnsupportedOperationException();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index c40b96b3c8..7fbcc5b035 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -31,12 +31,8 @@ public class OperatorStats {
 
   private final String _operatorType;
 
-  private int _numInputBlock = 0;
-  private int _numInputRows = 0;
-
-  private int _numOutputBlock = 0;
-
-  private int _numOutputRows = 0;
+  private int _numBlock = 0;
+  private int _numRows = 0;
 
   public OperatorStats(long requestId, int stageId, String operatorType) {
     _stageId = stageId;
@@ -56,23 +52,16 @@ public class OperatorStats {
     }
   }
 
-  public void recordInput(int numBlock, int numRows) {
-    _numInputBlock += numBlock;
-    _numInputRows += numRows;
-  }
-
-  public void recordOutput(int numBlock, int numRows) {
-    _numOutputBlock += numBlock;
-    _numOutputRows += numRows;
+  public void recordRow(int numBlock, int numRows) {
+    _numBlock += numBlock;
+    _numRows += numRows;
   }
 
   // TODO: Return the string as a JSON string.
   @Override
   public String toString() {
     return String.format(
-        "OperatorStats[type: %s, requestId: %s, stageId %s] ExecutionWallTime: %sms, InputRows: %s, InputBlock: "
-            + "%s, OutputRows: %s, OutputBlock: %s", _operatorType, _requestId, _stageId,
-        _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numInputRows, _numInputBlock, _numOutputRows,
-        _numOutputBlock);
+        "OperatorStats[requestId: %s, stageId %s, type: %s] ExecutionWallTime: %sms, No. Rows: %s, No. Block: %s",
+        _requestId, _stageId, _operatorType, _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numRows, _numBlock);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 9494280e0b..e24195948a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -50,7 +50,6 @@ public class SortOperator extends MultiStageOperator {
   private boolean _readyToConstruct;
   private boolean _isSortedBlockConstructed;
   private TransferableBlock _upstreamErrorBlock;
-  private OperatorStats _operatorStats;
 
   public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
@@ -63,6 +62,7 @@ public class SortOperator extends MultiStageOperator {
   SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
       int defaultHolderCapacity, long requestId, int stageId) {
+    super(requestId, stageId);
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
     _offset = Math.max(offset, 0);
@@ -72,7 +72,6 @@ public class SortOperator extends MultiStageOperator {
     _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
     _rows = new PriorityQueue<>(_numRowsToKeep,
         new SortComparator(collationKeys, collationDirections, dataSchema, false));
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -87,27 +86,21 @@ public class SortOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    _upstreamOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     try {
       consumeInputBlocks();
       return produceSortedBlock();
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
   private TransferableBlock produceSortedBlock() {
     if (_upstreamErrorBlock != null) {
-      LOGGER.error("OperatorStats:" + _operatorStats);
       return _upstreamErrorBlock;
     } else if (!_readyToConstruct) {
       return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -119,7 +112,6 @@ public class SortOperator extends MultiStageOperator {
         Object[] row = _rows.poll();
         rows.addFirst(row);
       }
-      _operatorStats.recordOutput(1, rows.size());
       _isSortedBlockConstructed = true;
       if (rows.size() == 0) {
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -133,9 +125,7 @@ public class SortOperator extends MultiStageOperator {
 
   private void consumeInputBlocks() {
     if (!_isSortedBlockConstructed) {
-      _operatorStats.endTimer();
       TransferableBlock block = _upstreamOperator.nextBlock();
-      _operatorStats.startTimer();
       while (!block.isNoOpBlock()) {
         // setting upstream error block
         if (block.isErrorBlock()) {
@@ -150,10 +140,7 @@ public class SortOperator extends MultiStageOperator {
         for (Object[] row : container) {
           SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
         }
-        _operatorStats.endTimer();
         block = _upstreamOperator.nextBlock();
-        _operatorStats.startTimer();
-        _operatorStats.recordInput(1, container.size());
       }
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index a75ad6b017..82f804a285 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -53,10 +53,10 @@ public class TransformOperator extends MultiStageOperator {
   // TODO: Check type matching between resultSchema and the actual result.
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
-  private OperatorStats _operatorStats;
 
   public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema,
       List<RexExpression> transforms, DataSchema upstreamDataSchema, long requestId, int stageId) {
+    super(requestId, stageId);
     Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty.");
     Preconditions.checkState(resultSchema.size() == transforms.size(),
         "result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size());
@@ -67,7 +67,6 @@ public class TransformOperator extends MultiStageOperator {
       _transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, upstreamDataSchema));
     }
     _resultSchema = resultSchema;
-    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -78,23 +77,16 @@ public class TransformOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    _upstreamOperator.toExplainString();
-    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    _operatorStats.startTimer();
     try {
-      _operatorStats.endTimer();
       TransferableBlock block = _upstreamOperator.nextBlock();
-      _operatorStats.startTimer();
       return transform(block);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
-    } finally {
-      _operatorStats.endTimer();
     }
   }
 
@@ -121,8 +113,6 @@ public class TransformOperator extends MultiStageOperator {
       }
       resultRows.add(resultRow);
     }
-    _operatorStats.recordInput(1, container.size());
-    _operatorStats.recordOutput(1, resultRows.size());
     return new TransferableBlock(resultRows, _resultSchema, DataBlock.Type.ROW);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org