You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/04/01 14:11:08 UTC

[pinot] branch master updated: Do not serialize metrics in each Operator (#10473)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 941cbf84f9 Do not serialize metrics in each Operator (#10473)
941cbf84f9 is described below

commit 941cbf84f912d7cd47f86299c8f4652cd046e097
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Sat Apr 1 19:41:00 2023 +0530

    Do not serialize metrics in each Operator (#10473)
    
    * WIP: Do not serialize metrics
    
    * No need to pass stats between operator. Only collected in the end at the send operator
    
    * Use opchain stats to record operatorStats
    
    * No need to serialie metrics in receive operator
    
    * Remove attachStats method and create stats object inside context itself
    
    * Make stats thread safe
    
    * Add test for opchain stats
    
    * Ensure SendOperator stats are populated before serializing stats
    
    * Fix variable scope
    
    * Use operator stats map directly from opchain stats
    
    * unify return statements outside inner for loop in MailboxSendOperator
    
    ---------
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../apache/pinot/query/runtime/QueryRunner.java    |   6 +-
 .../runtime/executor/OpChainSchedulerService.java  |   1 -
 .../LeafStageTransferableBlockOperator.java        |   3 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   7 +-
 .../runtime/operator/MailboxSendOperator.java      |  12 +-
 .../query/runtime/operator/MultiStageOperator.java |  42 ++-----
 .../pinot/query/runtime/operator/OpChain.java      |  11 +-
 .../pinot/query/runtime/operator/OpChainStats.java |  21 +++-
 .../query/runtime/operator/OperatorStats.java      |  14 +--
 .../runtime/operator/utils/OperatorUtils.java      |   4 +-
 .../runtime/plan/OpChainExecutionContext.java      |  14 +++
 .../query/service/dispatch/QueryDispatcher.java    |  25 +++--
 .../executor/OpChainSchedulerServiceTest.java      |   6 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |  34 ++++--
 .../runtime/operator/MailboxSendOperatorTest.java  |   2 +-
 .../pinot/query/runtime/operator/OpChainTest.java  | 124 +++++++++++++++++++++
 16 files changed, 237 insertions(+), 89 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index b0d0433f42..097b05b455 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -52,6 +52,7 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -213,8 +214,9 @@ public class QueryRunner {
       OpChainExecutionContext opChainExecutionContext =
           new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(), _rootServer, deadlineMs,
               deadlineMs, distributedStagePlan.getMetadataMap());
-      mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext,
-          new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()),
+      MultiStageOperator leafStageOperator =
+          new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
+      mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
           sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
           sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getStageId(),
           sendNode.getReceiverStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 3706627349..1f29584dcc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -111,7 +111,6 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
                 LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
                     result.getDataBlock().getExceptions());
               } else {
-                operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
                 LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
               }
             }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 9af6e8819d..a6ed81a3d8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -75,7 +75,8 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
     _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
     _currentIndex = 0;
     for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) {
-      _operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
+      OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId());
+      operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index fe9782c4c6..f65f9b8d15 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -215,8 +216,10 @@ public class MailboxReceiveOperator extends MultiStageOperator {
                 return block;
               }
             } else {
-              if (!block.getResultMetadata().isEmpty()) {
-                _operatorStatsMap.putAll(block.getResultMetadata());
+              if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
+                for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
+                  _opChainStats.getOperatorStatsMap().compute(entry.getKey(), (_key, _value) -> entry.getValue());
+                }
               }
               eosMailboxCount++;
             }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3be593a70a..ad0c24abb7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -40,6 +40,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,10 +152,19 @@ public class MailboxSendOperator extends MultiStageOperator {
     try {
       transferableBlock = _dataTableBlockBaseOperator.nextBlock();
       while (!transferableBlock.isNoOpBlock()) {
-        _exchange.send(transferableBlock);
         if (transferableBlock.isEndOfStreamBlock()) {
+          if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+            //Stats need to be populated here because the block is being sent to the mailbox
+            // and the receiving opChain will not be able to access the stats from the previous opChain
+            TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
+                OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+            _exchange.send(eosBlockWithStats);
+          } else {
+            _exchange.send(transferableBlock);
+          }
           return transferableBlock;
         }
+        _exchange.send(transferableBlock);
         transferableBlock = _dataTableBlockBaseOperator.nextBlock();
       }
     } catch (final Exception e) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 160b3d0f58..881c499c0f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -19,14 +19,9 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Joiner;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.InvocationScope;
@@ -37,23 +32,15 @@ import org.slf4j.LoggerFactory;
 public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable {
   private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
 
-  // TODO: Move to OperatorContext class.
-  protected final OperatorStats _operatorStats;
-  protected final Map<String, OperatorStats> _operatorStatsMap;
   private final String _operatorId;
   private final OpChainExecutionContext _context;
+  protected final OpChainStats _opChainStats;
 
   public MultiStageOperator(OpChainExecutionContext context) {
     _context = context;
-    _operatorStats =
-        new OperatorStats(_context, toExplainString());
-    _operatorStatsMap = new HashMap<>();
     _operatorId =
         Joiner.on("_").join(toExplainString(), _context.getRequestId(), _context.getStageId(), _context.getServer());
-  }
-
-  public Map<String, OperatorStats> getOperatorStatsMap() {
-    return _operatorStatsMap;
+    _opChainStats = _context.getStats();
   }
 
   @Override
@@ -62,28 +49,19 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
       throw new EarlyTerminationException("Interrupted while processing next block");
     }
     try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
-      _operatorStats.startTimer();
+      OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+      operatorStats.startTimer();
       TransferableBlock nextBlock = getNextBlock();
-      _operatorStats.endTimer(nextBlock);
-
-      _operatorStats.recordRow(1, nextBlock.getNumRows());
-      if (nextBlock.isEndOfStreamBlock()) {
-        if (nextBlock.isSuccessfulEndOfStreamBlock()) {
-          for (MultiStageOperator op : getChildOperators()) {
-            _operatorStatsMap.putAll(op.getOperatorStatsMap());
-          }
-          if (!_operatorStats.getExecutionStats().isEmpty()) {
-            _operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), _operatorId);
-            _operatorStatsMap.put(_operatorId, _operatorStats);
-          }
-          return TransferableBlockUtils.getEndOfStreamTransferableBlock(
-              OperatorUtils.getMetadataFromOperatorStats(_operatorStatsMap));
-        }
-      }
+      operatorStats.recordRow(1, nextBlock.getNumRows());
+      operatorStats.endTimer(nextBlock);
       return nextBlock;
     }
   }
 
+  public String getOperatorId() {
+    return _operatorId;
+  }
+
   // Make it protected because we should always call nextBlock()
   protected abstract TransferableBlock getNextBlock();
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 84fac943c2..53176787ce 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -37,16 +37,11 @@ public class OpChain implements AutoCloseable {
   private final OpChainStats _stats;
   private final OpChainId _id;
 
-  public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, int virtualServerId,
-      long requestId, int stageId) {
+  public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) {
     _root = root;
     _receivingMailbox = new HashSet<>(receivingMailboxes);
-    _id = new OpChainId(requestId, virtualServerId, stageId);
-    _stats = new OpChainStats(_id.toString());
-  }
-
-  public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) {
-    this(root, receivingMailboxes, context.getServer().virtualId(), context.getRequestId(), context.getStageId());
+    _id = context.getId();
+    _stats = context.getStats();
   }
 
   public Operator<TransferableBlock> getRoot() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 60dba3a0de..5b2cc2a065 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -21,12 +21,13 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Suppliers;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 
@@ -50,7 +51,7 @@ public class OpChainStats {
   private final AtomicLong _queuedCount = new AtomicLong();
 
   private final String _id;
-  private Map<String, OperatorStats> _operatorStatsMap = new HashMap<>();
+  private final ConcurrentHashMap<String, OperatorStats> _operatorStatsMap = new ConcurrentHashMap<>();
 
   public OpChainStats(String id) {
     _id = id;
@@ -73,12 +74,16 @@ public class OpChainStats {
     }
   }
 
-  public Map<String, OperatorStats> getOperatorStatsMap() {
+  public ConcurrentHashMap<String, OperatorStats> getOperatorStatsMap() {
     return _operatorStatsMap;
   }
 
-  public void setOperatorStatsMap(Map<String, OperatorStats> operatorStatsMap) {
-    _operatorStatsMap = operatorStatsMap;
+  public OperatorStats getOperatorStats(OpChainExecutionContext context, String operatorId) {
+    return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
+       OperatorStats operatorStats = new OperatorStats(context);
+       operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
+       return operatorStats;
+     });
   }
 
   private void startExecutionTimer() {
@@ -89,6 +94,10 @@ public class OpChainStats {
     }
   }
 
+  public long getExecutionTime() {
+    return _executeStopwatch.elapsed(TimeUnit.MILLISECONDS);
+  }
+
   @Override
   public String toString() {
     return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(),
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 52ca89a3ee..2655a5c286 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -35,9 +35,8 @@ public class OperatorStats {
   // TODO: add a operatorId for better tracking purpose.
   private final int _stageId;
   private final long _requestId;
-  private final VirtualServerAddress _serverAddress;
 
-  private final String _operatorType;
+  private final VirtualServerAddress _serverAddress;
 
   private int _numBlock = 0;
   private int _numRows = 0;
@@ -45,16 +44,15 @@ public class OperatorStats {
   private final Map<String, String> _executionStats;
   private boolean _processingStarted = false;
 
-  public OperatorStats(OpChainExecutionContext context, String operatorType) {
-    this(context.getRequestId(), context.getStageId(), context.getServer(), operatorType);
+  public OperatorStats(OpChainExecutionContext context) {
+    this(context.getRequestId(), context.getStageId(), context.getServer());
   }
 
   //TODO: remove this constructor after the context constructor can be used in serialization and deserialization
-  public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) {
+  public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress) {
     _stageId = stageId;
     _requestId = requestId;
     _serverAddress = serverAddress;
-    _operatorType = operatorType;
     _executionStats = new HashMap<>();
   }
 
@@ -115,10 +113,6 @@ public class OperatorStats {
     return _serverAddress;
   }
 
-  public String getOperatorType() {
-    return _operatorType;
-  }
-
   @Override
   public String toString() {
     return OperatorUtils.operatorStatsToJson(this);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index ab79339891..601f169cb3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -81,7 +81,6 @@ public class OperatorUtils {
       jsonOut.put("requestId", operatorStats.getRequestId());
       jsonOut.put("stageId", operatorStats.getStageId());
       jsonOut.put("serverAddress", operatorStats.getServerAddress().toString());
-      jsonOut.put("operatorType", operatorStats.getOperatorType());
       jsonOut.put("executionStats", operatorStats.getExecutionStats());
       return JsonUtils.objectToString(jsonOut);
     } catch (Exception e) {
@@ -97,10 +96,9 @@ public class OperatorUtils {
       int stageId = operatorStatsNode.get("stageId").asInt();
       String serverAddressStr = operatorStatsNode.get("serverAddress").asText();
       VirtualServerAddress serverAddress = VirtualServerAddress.parse(serverAddressStr);
-      String operatorType = operatorStatsNode.get("operatorType").asText();
 
       OperatorStats operatorStats =
-          new OperatorStats(requestId, stageId, serverAddress, operatorType);
+          new OperatorStats(requestId, stageId, serverAddress);
       operatorStats.recordExecutionStats(
           JsonUtils.jsonNodeToObject(operatorStatsNode.get("executionStats"), new TypeReference<Map<String, String>>() {
           }));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 1f500ce8b2..64141a024b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -23,6 +23,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
 
 
 /**
@@ -38,6 +40,8 @@ public class OpChainExecutionContext {
   private final long _timeoutMs;
   private final long _deadlineMs;
   private final Map<Integer, StageMetadata> _metadataMap;
+  private final OpChainId _id;
+  private final OpChainStats _stats;
 
   public OpChainExecutionContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
       VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap) {
@@ -48,6 +52,8 @@ public class OpChainExecutionContext {
     _timeoutMs = timeoutMs;
     _deadlineMs = deadlineMs;
     _metadataMap = metadataMap;
+    _id = new OpChainId(requestId, server.virtualId(), stageId);
+    _stats = new OpChainStats(_id.toString());
   }
 
   public OpChainExecutionContext(PlanRequestContext planRequestContext) {
@@ -83,4 +89,12 @@ public class OpChainExecutionContext {
   public Map<Integer, StageMetadata> getMetadataMap() {
     return _metadataMap;
   }
+
+  public OpChainId getId() {
+    return _id;
+  }
+
+  public OpChainStats getStats() {
+    return _stats;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 4d01c67105..36c93a5827 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -55,6 +55,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -175,12 +176,16 @@ public class QueryDispatcher {
   public static ResultTable runReducer(long requestId, QueryPlan queryPlan, int reduceStageId, long timeoutMs,
       MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
+    VirtualServerAddress server =
+        new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0);
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, timeoutMs,
+            queryPlan.getStageMetadataMap());
     MailboxReceiveOperator mailboxReceiveOperator =
-        createReduceStageOperator(mailboxService, queryPlan.getStageMetadataMap(), requestId,
-            reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(),
-            new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs);
+        createReduceStageOperator(
+            reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), context);
     List<DataBlock> resultDataBlocks =
-        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan);
+        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan, context.getStats());
     return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
         queryPlan.getQueryStageMap().get(0).getDataSchema());
   }
@@ -193,7 +198,8 @@ public class QueryDispatcher {
   }
 
   private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
-      @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan) {
+      @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan,
+      OpChainStats stats) {
     List<DataBlock> resultDataBlocks = new ArrayList<>();
     TransferableBlock transferableBlock;
     long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -209,7 +215,7 @@ public class QueryDispatcher {
         continue;
       } else if (transferableBlock.isEndOfStreamBlock()) {
         if (executionStatsAggregatorMap != null) {
-          for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
+          for (Map.Entry<String, OperatorStats> entry : stats.getOperatorStatsMap().entrySet()) {
             LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(),
                 OperatorUtils.operatorStatsToJson(entry.getValue()));
             OperatorStats operatorStats = entry.getValue();
@@ -276,11 +282,8 @@ public class QueryDispatcher {
     return new DataSchema(colNames, colTypes);
   }
 
-  private static MailboxReceiveOperator createReduceStageOperator(MailboxService<TransferableBlock> mailboxService,
-      Map<Integer, StageMetadata> stageMetadataMap, long jobId, int stageId, int reducerStageId, DataSchema dataSchema,
-      VirtualServerAddress server, long timeoutMs) {
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap);
+  private static MailboxReceiveOperator createReduceStageOperator(int stageId, int reducerStageId,
+      DataSchema dataSchema, OpChainExecutionContext context) {
     // timeout is set for reduce stage
     MailboxReceiveOperator mailboxReceiveOperator =
         new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 345b4ca51f..a30adf04f4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -24,9 +24,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.stubbing.Answer;
@@ -71,7 +73,9 @@ public class OpChainSchedulerServiceTest {
   }
 
   private OpChain getChain(MultiStageOperator operator) {
-    return new OpChain(operator, ImmutableList.of(), 1, 123, 1);
+    VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
+    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null);
+    return new OpChain(context, operator, ImmutableList.of());
   }
 
   @Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 88974b1cb3..34b39a697b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableList;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -76,8 +78,9 @@ public class RoundRobinSchedulerTest {
   @Test
   public void testSchedulerHappyPath()
       throws InterruptedException {
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
-        123, DEFAULT_RECEIVER_STAGE_ID);
+    OpChain chain =
+        new OpChain(getOpChainExecutionContext(DEFAULT_RECEIVER_STAGE_ID, 123, DEFAULT_VIRTUAL_SERVER_ID), _operator,
+            ImmutableList.of(MAILBOX_1));
     _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
     _scheduler.register(chain);
 
@@ -102,8 +105,9 @@ public class RoundRobinSchedulerTest {
   @Test
   public void testSchedulerWhenSenderDies()
       throws InterruptedException {
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
-        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    OpChain chain = new OpChain(
+        getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, DEFAULT_VIRTUAL_SERVER_ID), _operator,
+        ImmutableList.of(MAILBOX_1));
     _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
     _scheduler.register(chain);
 
@@ -132,12 +136,17 @@ public class RoundRobinSchedulerTest {
     // When parallelism is > 1, multiple OpChains with the same requestId and stageId would be registered in the same
     // scheduler. Data received on a given mailbox should wake up exactly 1 OpChain corresponding to the virtual
     // server-id determined by the Mailbox.
-    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), MAILBOX_1.getToHost().virtualId(),
-        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
-    OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), MAILBOX_2.getToHost().virtualId(),
-        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
-    OpChain chain3 = new OpChain(_operator, ImmutableList.of(MAILBOX_3), MAILBOX_3.getToHost().virtualId(),
-        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    OpChain chain1 = new OpChain(
+        getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_1.getToHost().virtualId()),
+        _operator, ImmutableList.of(MAILBOX_1));
+    OpChain chain2 = new OpChain(
+        getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_2.getToHost().virtualId()),
+        _operator, ImmutableList.of(MAILBOX_2));
+    OpChain chain3 = new OpChain(
+        getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_3.getToHost().virtualId()),
+        _operator, ImmutableList.of(MAILBOX_3));
+
+
     // Register 3 OpChains. Keep release timeout high to avoid unintended OpChain wake-ups.
     _scheduler = new RoundRobinScheduler(10_000);
     _scheduler.register(chain1);
@@ -172,4 +181,9 @@ public class RoundRobinSchedulerTest {
     Assert.assertEquals(0,
         _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
   }
+
+  private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
+    return new OpChainExecutionContext(null, requestId, stageId,
+        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null);
+  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 72d5ba66b8..25b307b22b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -169,7 +169,7 @@ public class MailboxSendOperatorTest {
 
     // Then:
     Assert.assertTrue(block.isEndOfStreamBlock(), "expected EOS block to propagate");
-    Mockito.verify(_exchange).send(eosBlock);
+    Assert.assertEquals(block, eosBlock);
   }
 
   @Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
new file mode 100644
index 0000000000..6f83f9ac77
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class OpChainTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private MultiStageOperator _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testExecutionTimerStats() {
+    Mockito.when(_upstreamOperator.nextBlock()).then(x -> {
+      Thread.sleep(1000);
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>());
+    opChain.getStats().executing();
+    opChain.getRoot().nextBlock();
+    opChain.getStats().queued();
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+
+    Mockito.when(_upstreamOperator.nextBlock()).then(x -> {
+      Thread.sleep(20);
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>());
+    opChain.getStats().executing();
+    opChain.getRoot().nextBlock();
+    opChain.getStats().queued();
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= 20);
+    Assert.assertTrue(opChain.getStats().getExecutionTime() < 100);
+  }
+
+  @Test
+  public void testStatsCollection() {
+    OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+    DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
+
+    OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>());
+    opChain.getStats().executing();
+    opChain.getRoot().nextBlock();
+    opChain.getStats().queued();
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+    Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 1);
+    Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(dummyMultiStageOperator.getOperatorId()));
+
+    Map<String, String> executionStats =
+        opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats();
+    Assert.assertTrue(
+        Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) >= 1000);
+    Assert.assertTrue(
+        Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) <= 2000);
+  }
+
+  static class DummyMultiStageOperator extends MultiStageOperator {
+    public DummyMultiStageOperator(OpChainExecutionContext context) {
+      super(context);
+    }
+
+    @Override
+    protected TransferableBlock getNextBlock() {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // IGNORE
+      }
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+
+    @Nullable
+    @Override
+    public String toExplainString() {
+      return "DUMMY";
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org