You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/24 02:57:22 UTC

[pinot] branch master updated: [multistage][feature] add pipeline breaker stats collector (#10958)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a47311c03 [multistage][feature] add pipeline breaker stats collector (#10958)
6a47311c03 is described below

commit 6a47311c03fdc020a4e05a3acf5cacb7843e3307
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Jun 23 19:57:16 2023 -0700

    [multistage][feature] add pipeline breaker stats collector (#10958)
    
    - [multistage][pb][stats] add stats support for pipeline breaker results
    - add stats test collector
    - refactor the test setup format to be easier to understand
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../runtime/plan/OpChainExecutionContext.java      | 11 +++-
 .../plan/pipeline/PipelineBreakerExecutor.java     | 17 +++---
 .../plan/pipeline/PipelineBreakerOperator.java     |  1 -
 .../plan/pipeline/PipelineBreakerResult.java       | 12 +++-
 .../executor/OpChainSchedulerServiceTest.java      |  2 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |  2 +-
 .../operator/MailboxReceiveOperatorTest.java       | 58 +++++++++++---------
 .../runtime/operator/MailboxSendOperatorTest.java  |  2 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  8 +--
 .../query/runtime/operator/OperatorTestUtil.java   | 23 +++++++-
 .../operator/SortedMailboxReceiveOperatorTest.java | 64 ++++++++++++----------
 .../plan/pipeline/PipelineBreakerExecutorTest.java | 23 ++++++--
 12 files changed, 139 insertions(+), 84 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index b23f41400a..b179900e0c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -24,6 +24,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 
 
 /**
@@ -45,7 +46,7 @@ public class OpChainExecutionContext {
   @VisibleForTesting
   public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
       VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata,
-      boolean traceEnabled) {
+      PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
@@ -54,13 +55,17 @@ public class OpChainExecutionContext {
     _stageMetadata = stageMetadata;
     _id = new OpChainId(requestId, server.workerId(), stageId);
     _stats = new OpChainStats(_id.toString());
+    if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
+      _stats.getOperatorStatsMap().putAll(
+          pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
+    }
     _traceEnabled = traceEnabled;
   }
 
   public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
     this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
-        physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
-        physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
+        physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(),
+        physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled());
   }
 
   public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index fa33192a64..c148b37b0c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -77,9 +77,7 @@ public class PipelineBreakerExecutor {
       PhysicalPlanContext physicalPlanContext =
           new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
               distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
-      Map<Integer, List<TransferableBlock>> resultMap =
-          PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
-      return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+      return PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
       } catch (Exception e) {
         LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: "
             + distributedStagePlan.getStageId(), e);
@@ -91,14 +89,14 @@ public class PipelineBreakerExecutor {
             resultMap.put(key, Collections.singletonList(errorBlock));
           }
         }
-        return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+        return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null);
       }
     } else {
       return null;
     }
   }
 
-  private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,
+  private static PipelineBreakerResult execute(OpChainSchedulerService scheduler,
       PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
       throws Exception {
     Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>();
@@ -111,11 +109,11 @@ public class PipelineBreakerExecutor {
       OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext);
       pipelineWorkerMap.put(key, tempOpChain.getRoot());
     }
-    return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext);
+    return runMailboxReceivePipelineBreaker(scheduler, context, pipelineWorkerMap, physicalPlanContext);
   }
 
-  private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker(
-      OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
+  private static PipelineBreakerResult runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler,
+      PipelineBreakerContext context, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
       PhysicalPlanContext physicalPlanContext)
       throws Exception {
     PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator(
@@ -126,7 +124,8 @@ public class PipelineBreakerExecutor {
     scheduler.register(pipelineBreakerOpChain);
     long timeoutMs = physicalPlanContext.getDeadlineMs() - System.currentTimeMillis();
     if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
-      return pipelineBreakerOperator.getResultMap();
+      return new PipelineBreakerResult(context.getNodeIdMap(), pipelineBreakerOperator.getResultMap(),
+          pipelineBreakerOpChain.getStats());
     } else {
       throw new IOException("Exception occur when awaiting breaker results!");
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 438ce15a37..c9d4d58682 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -93,7 +93,6 @@ class PipelineBreakerOperator extends MultiStageOperator {
           return _errorBlock;
         }
         List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>());
-        // TODO: only data block is handled, we also need to handle metadata block from upstream in the future.
         if (!block.isEndOfStreamBlock()) {
           blockList.add(block);
         }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index 8388337ff1..fe0f2cba61 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime.plan.pipeline;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
 
 
 /**
@@ -30,10 +32,13 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 public class PipelineBreakerResult {
   private final Map<PlanNode, Integer> _nodeIdMap;
   private final Map<Integer, List<TransferableBlock>> _resultMap;
+  private final OpChainStats _opChainStats;
 
-  public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) {
+  public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap,
+      OpChainStats opChainStats) {
     _nodeIdMap = nodeIdMap;
     _resultMap = resultMap;
+    _opChainStats = opChainStats;
   }
 
   public Map<PlanNode, Integer> getNodeIdMap() {
@@ -43,4 +48,9 @@ public class PipelineBreakerResult {
   public Map<Integer, List<TransferableBlock>> getResultMap() {
     return _resultMap;
   }
+
+  @Nullable
+  public OpChainStats getOpChainStats() {
+    return _opChainStats;
+  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 332ceaaaab..5088decbbe 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
 
   private OpChain getChain(MultiStageOperator operator) {
     VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
-    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true);
+    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, null, true);
     return new OpChain(context, operator, ImmutableList.of());
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 798eb534ba..1c400c6e0f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest {
 
   private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
     return new OpChainExecutionContext(null, requestId, stageId,
-        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true);
+        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, null, true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index fed5ce422e..48ace11a3f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -75,20 +75,25 @@ public class MailboxReceiveOperatorTest {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
     _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
-    // sending stage is 0, receiving stage is 1
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+            .addMailBoxInfoMap(0, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+                ImmutableList.of(server1, server2), ImmutableMap.of()))
+            .addMailBoxInfoMap(1, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+                ImmutableList.of(server1, server2), ImmutableMap.of()))
+            .build()).collect(Collectors.toList())).build();
     _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-            ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-            ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+            .addMailBoxInfoMap(0, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+                ImmutableList.of(server1), ImmutableMap.of()))
+            .addMailBoxInfoMap(1, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+                ImmutableList.of(server1), ImmutableMap.of()))
+            .build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
@@ -105,7 +110,7 @@ public class MailboxReceiveOperatorTest {
         Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
             .collect(Collectors.toList())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
   }
@@ -113,7 +118,7 @@ public class MailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
   }
@@ -125,8 +130,8 @@ public class MailboxReceiveOperatorTest {
 
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+            _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
@@ -136,8 +141,9 @@ public class MailboxReceiveOperatorTest {
     }
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
-    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
-        _stageMetadata1, false);
+    context =
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+            _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
@@ -150,7 +156,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
     }
@@ -162,7 +168,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
@@ -176,7 +182,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
@@ -193,7 +199,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
@@ -211,7 +217,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -234,7 +240,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       // Receive first block from server1
@@ -259,7 +265,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 2b74bb727a..c35714aa16 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -184,7 +184,7 @@ public class MailboxSendOperatorTest {
             new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
-            stageMetadata, false);
+            stageMetadata, null, false);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 910cc38523..e7fdb70f3b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -192,7 +192,7 @@ public class OpChainTest {
     int receivedStageId = 2;
     int senderStageId = 1;
     OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
-        System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+        System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -206,7 +206,7 @@ public class OpChainTest {
 
     OpChainExecutionContext secondStageContext =
         new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+            System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true);
 
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -231,7 +231,7 @@ public class OpChainTest {
     int receivedStageId = 2;
     int senderStageId = 1;
     OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
-        System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+        System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -243,7 +243,7 @@ public class OpChainTest {
 
     OpChainExecutionContext secondStageContext =
         new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+            System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false);
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 7ecd344ab3..55878658f7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
 
 
@@ -41,6 +46,12 @@ public class OperatorTestUtil {
   public static final String OP_1 = "op1";
   public static final String OP_2 = "op2";
 
+  public static Map<String, String> getDummyStats(long requestId, int stageId, VirtualServerAddress serverAddress) {
+    OperatorStats operatorStats = new OperatorStats(requestId, stageId, serverAddress);
+    String statsId = new OpChainId(requestId, serverAddress.workerId(), stageId).toString();
+    return OperatorUtils.getMetadataFromOperatorStats(ImmutableMap.of(statsId, operatorStats));
+  }
+
   static {
     MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory().registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA)
         .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA).addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0))
@@ -62,18 +73,24 @@ public class OperatorTestUtil {
     return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
   }
 
+  public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService,
+      VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) {
+    return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, stageMetadata, null, false);
+  }
+
   public static OpChainExecutionContext getDefaultContext() {
     VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, true);
   }
 
   public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
     VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, false);
   }
 
   public static OpChainExecutionContext getContext(long requestId, int stageId,
       VirtualServerAddress virtualServerAddress) {
-    return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true);
+    return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, null,
+        true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 25ef09f279..80cf7ed1c4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -85,20 +85,25 @@ public class SortedMailboxReceiveOperatorTest {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
     _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
-    // sending stage is 0, receiving stage is 1
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+            .addMailBoxInfoMap(0, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+                ImmutableList.of(server1, server2), ImmutableMap.of()))
+            .addMailBoxInfoMap(1, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+                ImmutableList.of(server1, server2), ImmutableMap.of()))
+            .build()).collect(Collectors.toList())).build();
     _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-            ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
-            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-            ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+            .addMailBoxInfoMap(0, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+                ImmutableList.of(server1), ImmutableMap.of()))
+            .addMailBoxInfoMap(1, new MailboxMetadata(
+                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+                ImmutableList.of(server1), ImmutableMap.of()))
+            .build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
@@ -115,7 +120,7 @@ public class SortedMailboxReceiveOperatorTest {
         Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
             .collect(Collectors.toList())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -124,7 +129,7 @@ public class SortedMailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -134,8 +139,8 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowOnEmptyCollationKey() {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+            _stageMetadata1);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
         Collections.emptyList(), Collections.emptyList(), false, 1);
@@ -147,8 +152,8 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+            _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -160,8 +165,9 @@ public class SortedMailboxReceiveOperatorTest {
     }
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
-    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
-        _stageMetadata1, false);
+    context =
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+            _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -175,7 +181,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldReceiveSingletonNullMailbox() {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -188,7 +194,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -203,7 +209,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -221,7 +227,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -240,7 +246,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -263,7 +269,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -288,7 +294,7 @@ public class SortedMailboxReceiveOperatorTest {
         OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -319,7 +325,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
         false, 1)) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 56d78effe4..6680b56942 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -87,8 +87,7 @@ public class PipelineBreakerExecutorTest {
           .addMailBoxInfoMap(2, new MailboxMetadata(
               ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
               ImmutableList.of(_server), ImmutableMap.of()))
-          .build())
-      .collect(Collectors.toList())).build();
+          .build()).collect(Collectors.toList())).build();
 
   @BeforeClass
   public void setUpClass() {
@@ -127,7 +126,7 @@ public class PipelineBreakerExecutorTest {
     Object[] row2 = new Object[]{2, 3};
     when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
         OperatorTestUtil.block(DATA_SCHEMA, row2),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+        TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
@@ -138,6 +137,10 @@ public class PipelineBreakerExecutorTest {
     Assert.assertNotNull(pipelineBreakerResult);
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
     Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2);
+
+    // should collect stats from previous stage here
+    Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
+    Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 1);
   }
 
   @Test
@@ -160,9 +163,9 @@ public class PipelineBreakerExecutorTest {
     Object[] row1 = new Object[]{1, 1};
     Object[] row2 = new Object[]{2, 3};
     when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+        TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+        TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 2, _server)));
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
@@ -176,6 +179,10 @@ public class PipelineBreakerExecutorTest {
     Assert.assertEquals(it.next().size(), 1);
     Assert.assertEquals(it.next().size(), 1);
     Assert.assertFalse(it.hasNext());
+
+    // should collect stats from previous stage here
+    Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
+    Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 2);
   }
 
   @Test
@@ -199,6 +206,9 @@ public class PipelineBreakerExecutorTest {
     Assert.assertEquals(resultBlocks.size(), 1);
     Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
     Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+
+    // should have null stats from previous stage here
+    Assert.assertNull(pipelineBreakerResult.getOpChainStats());
   }
 
   @Test
@@ -268,6 +278,9 @@ public class PipelineBreakerExecutorTest {
       Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
       Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
     }
+
+    // should have null stats from previous stage here
+    Assert.assertNull(pipelineBreakerResult.getOpChainStats());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org