You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/04/11 04:38:51 UTC

[pinot] branch master updated: Do not record operator stats when tracing is enabled (#10447)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe7a72b9d2 Do not record operator stats when tracing is enabled (#10447)
fe7a72b9d2 is described below

commit fe7a72b9d27d8764670778474595ebe18d101e16
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Tue Apr 11 10:08:45 2023 +0530

    Do not record operator stats when tracing is enabled (#10447)
    
    * Add tracing parameter to opChainContext
    
    * Do not record stats when tracing is disabled
    
    * Do not collect stats at end of leafstage
    
    * Do not fill stage stats when tracing is disabled
    
    * Fix tests
    
    * Record stageWallTime even when tracing is disabled
    
    * Add javadocs for operators
    
    * Add more operators in the chain
    
    * Bug fix: End time pointing to serialization time and not operator end time
    
    * Change test to chain actual operators instead of mock
    
    * Bug fix: Record stage block and row stats even when tracing is disabled
    
    * Fix tests after rebasing with SortedReceivedOperator PR
    
    * Table names should always be populated in the stats
    
    * Fix tests
    
    ---------
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
    Co-authored-by: Kartik Khare <kh...@kartiks-macbook-pro.tail8a064.ts.net>
---
 .../MultiStageBrokerRequestHandler.java            |   4 +-
 .../query/reduce/ExecutionStatsAggregator.java     |  15 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |   9 +-
 .../LeafStageTransferableBlockOperator.java        |   9 +
 .../runtime/operator/MailboxSendOperator.java      |   9 +
 .../query/runtime/operator/MultiStageOperator.java |  19 +-
 .../pinot/query/runtime/operator/OpChainStats.java |  12 +-
 .../query/runtime/operator/OperatorStats.java      |  15 +-
 .../runtime/plan/OpChainExecutionContext.java      |  11 +-
 .../query/runtime/plan/PlanRequestContext.java     |   9 +-
 .../runtime/plan/ServerRequestPlanVisitor.java     |   3 +-
 .../plan/server/ServerPlanRequestContext.java      |   4 +-
 .../query/service/dispatch/QueryDispatcher.java    |  22 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   3 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  16 +-
 .../executor/OpChainSchedulerServiceTest.java      |   2 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |   2 +-
 .../operator/MailboxReceiveOperatorTest.java       |  32 +--
 .../runtime/operator/MailboxSendOperatorTest.java  |   7 +-
 .../pinot/query/runtime/operator/OpChainTest.java  | 257 ++++++++++++++++++++-
 .../query/runtime/operator/OperatorTestUtil.java   |  10 +-
 .../operator/SortedMailboxReceiveOperatorTest.java |  38 +--
 .../runtime/queries/ResourceBasedQueriesTest.java  |   5 +-
 .../service/dispatch/QueryDispatcherTest.java      |   4 +-
 24 files changed, 425 insertions(+), 92 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ebb7b7dc3e..9e74dcf400 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -209,14 +209,14 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
 
     ResultTable queryResults;
     Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
-    for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
+    for (Integer stageId : queryPlan.getStageMetadataMap().keySet()) {
       stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
     }
 
     long executionStartTimeNs = System.nanoTime();
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs,
-          sqlNodeAndOptions.getOptions(), stageIdStatsMap);
+          sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
     } catch (Exception e) {
       LOGGER.info("query execution failed", e);
       return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index c8ef48af4a..bc8612c551 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.reduce;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -363,19 +362,19 @@ public class ExecutionStatsAggregator {
 
   public void setStageLevelStats(@Nullable String rawTableName, BrokerResponseStats brokerResponseStats,
       @Nullable BrokerMetrics brokerMetrics) {
-    setStats(rawTableName, brokerResponseStats, brokerMetrics);
+    if (_enableTrace) {
+      setStats(rawTableName, brokerResponseStats, brokerMetrics);
+      brokerResponseStats.setOperatorStats(_operatorStats);
+    }
 
     brokerResponseStats.setNumBlocks(_numBlocks);
     brokerResponseStats.setNumRows(_numRows);
     brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
-    brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - _stageExecStartTimeMs);
     brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
-    if (_enableTrace) {
-      brokerResponseStats.setOperatorStats(_operatorStats);
-    } else {
-      brokerResponseStats.setOperatorStats(Collections.emptyMap());
-    }
     brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
+    if (_stageExecStartTimeMs >= 0 && _stageExecEndTimeMs >= 0) {
+      brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - _stageExecStartTimeMs);
+    }
   }
 
   private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 097b05b455..a1cfafd8ee 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -164,6 +164,8 @@ public class QueryRunner {
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+    boolean isTraceEnabled =
+        Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
     if (isLeafStage(distributedStagePlan)) {
       runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs, requestId);
@@ -171,7 +173,8 @@ public class QueryRunner {
       StageNode stageRoot = distributedStagePlan.getStageRoot();
       OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, deadlineMs,
-              new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap()));
+              new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap(),
+              isTraceEnabled));
       _scheduler.register(rootOperator);
     }
   }
@@ -195,6 +198,8 @@ public class QueryRunner {
     // server executor.
     MailboxSendOperator mailboxSendOperator = null;
     try {
+      boolean isTraceEnabled =
+          Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
       long leafStageStartMillis = System.currentTimeMillis();
       List<ServerPlanRequestContext> serverQueryRequests =
           constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService,
@@ -213,7 +218,7 @@ public class QueryRunner {
       MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
       OpChainExecutionContext opChainExecutionContext =
           new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(), _rootServer, deadlineMs,
-              deadlineMs, distributedStagePlan.getMetadataMap());
+              deadlineMs, distributedStagePlan.getMetadataMap(), isTraceEnabled);
       MultiStageOperator leafStageOperator =
           new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
       mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a6ed81a3d8..5656baebe8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -114,6 +114,15 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
     }
   }
 
+  /**
+   * Leaf stage operators should always collect stats for the tables used in queries
+   * Otherwise the Broker response will just contain zeros for every stat value
+   */
+  @Override
+  protected boolean shouldCollectStats() {
+    return true;
+  }
+
   /**
    * this is data transfer block compose method is here to ensure that V1 results match what the expected projection
    * schema in the calcite logical operator.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index ad0c24abb7..bb807cb22c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -178,6 +178,15 @@ public class MailboxSendOperator extends MultiStageOperator {
     return transferableBlock;
   }
 
+  /**
+   * This method is overridden to return true because this operator is last in the chain and needs to collect
+   * execution time stats
+   */
+  @Override
+  protected boolean shouldCollectStats() {
+    return true;
+  }
+
   @Override
   public void close() {
     super.close();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 881c499c0f..6b9f630eb5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -49,11 +49,16 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
       throw new EarlyTerminationException("Interrupted while processing next block");
     }
     try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
-      OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
-      operatorStats.startTimer();
-      TransferableBlock nextBlock = getNextBlock();
-      operatorStats.recordRow(1, nextBlock.getNumRows());
-      operatorStats.endTimer(nextBlock);
+      TransferableBlock nextBlock;
+      if (shouldCollectStats()) {
+        OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+        operatorStats.startTimer();
+        nextBlock = getNextBlock();
+        operatorStats.recordRow(1, nextBlock.getNumRows());
+        operatorStats.endTimer(nextBlock);
+      } else {
+        nextBlock = getNextBlock();
+      }
       return nextBlock;
     }
   }
@@ -65,6 +70,10 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
   // Make it protected because we should always call nextBlock()
   protected abstract TransferableBlock getNextBlock();
 
+  protected boolean shouldCollectStats() {
+    return _context.isTraceEnabled();
+  }
+
   @Override
   public List<MultiStageOperator> getChildOperators() {
     throw new UnsupportedOperationException();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 5b2cc2a065..6c259eaff2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -79,11 +79,13 @@ public class OpChainStats {
   }
 
   public OperatorStats getOperatorStats(OpChainExecutionContext context, String operatorId) {
-    return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
-       OperatorStats operatorStats = new OperatorStats(context);
-       operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
-       return operatorStats;
-     });
+      return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
+        OperatorStats operatorStats = new OperatorStats(context);
+        if (context.isTraceEnabled()) {
+          operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
+        }
+        return operatorStats;
+      });
   }
 
   private void startExecutionTimer() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 2655a5c286..5985e23b13 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -41,6 +41,7 @@ public class OperatorStats {
   private int _numBlock = 0;
   private int _numRows = 0;
   private long _startTimeMs = -1;
+  private long _endTimeMs = -1;
   private final Map<String, String> _executionStats;
   private boolean _processingStarted = false;
 
@@ -66,9 +67,11 @@ public class OperatorStats {
   public void endTimer(TransferableBlock block) {
     if (_executeStopwatch.isRunning()) {
       _executeStopwatch.stop();
+      _endTimeMs = System.currentTimeMillis();
     }
     if (!_processingStarted && block.isNoOpBlock()) {
       _startTimeMs = -1;
+      _endTimeMs = -1;
       _executeStopwatch.reset();
     } else {
       _processingStarted = true;
@@ -94,10 +97,14 @@ public class OperatorStats {
     _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
         String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
     // wall time are recorded slightly longer than actual execution but it is OK.
-    _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
-        String.valueOf(_startTimeMs));
-    _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
-        String.valueOf(System.currentTimeMillis()));
+
+    if (_startTimeMs != -1) {
+      _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
+          String.valueOf(_startTimeMs));
+      long endTimeMs = _endTimeMs == -1 ? System.currentTimeMillis() : _endTimeMs;
+      _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
+          String.valueOf(endTimeMs));
+    }
     return _executionStats;
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 64141a024b..68d8d4b4ae 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -42,9 +42,11 @@ public class OpChainExecutionContext {
   private final Map<Integer, StageMetadata> _metadataMap;
   private final OpChainId _id;
   private final OpChainStats _stats;
+  private final boolean _traceEnabled;
 
   public OpChainExecutionContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
-      VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap) {
+      VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap,
+      boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
@@ -54,12 +56,13 @@ public class OpChainExecutionContext {
     _metadataMap = metadataMap;
     _id = new OpChainId(requestId, server.virtualId(), stageId);
     _stats = new OpChainStats(_id.toString());
+    _traceEnabled = traceEnabled;
   }
 
   public OpChainExecutionContext(PlanRequestContext planRequestContext) {
     this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(),
         planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(),
-        planRequestContext.getMetadataMap());
+        planRequestContext.getMetadataMap(), planRequestContext.isTraceEnabled());
   }
 
   public MailboxService<TransferableBlock> getMailboxService() {
@@ -97,4 +100,8 @@ public class OpChainExecutionContext {
   public OpChainStats getStats() {
     return _stats;
   }
+
+  public boolean isTraceEnabled() {
+    return _traceEnabled;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index fb610ebb46..21899d7d34 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -40,9 +40,11 @@ public class PlanRequestContext {
   protected final Map<Integer, StageMetadata> _metadataMap;
   protected final List<MailboxIdentifier> _receivingMailboxes = new ArrayList<>();
   private final OpChainExecutionContext _opChainExecutionContext;
+  private final boolean _traceEnabled;
 
   public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
-      long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) {
+      long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap,
+      boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
@@ -50,6 +52,7 @@ public class PlanRequestContext {
     _deadlineMs = deadlineMs;
     _server = server;
     _metadataMap = metadataMap;
+    _traceEnabled = traceEnabled;
     _opChainExecutionContext = new OpChainExecutionContext(this);
   }
 
@@ -92,4 +95,8 @@ public class PlanRequestContext {
   public OpChainExecutionContext getOpChainExecutionContext() {
     return _opChainExecutionContext;
   }
+
+  public boolean isTraceEnabled() {
+    return _traceEnabled;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index b6194bd15b..6255a99ddc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -97,6 +97,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
     long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16)
         + (stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1 : 0);
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+    boolean traceEnabled = Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE));
     PinotQuery pinotQuery = new PinotQuery();
     Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
     if (leafNodeLimit != null) {
@@ -109,7 +110,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
     ServerPlanRequestContext context =
         new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
             new VirtualServerAddress(stagePlan.getServer()), stagePlan.getMetadataMap(), pinotQuery, tableType,
-            timeBoundaryInfo);
+            timeBoundaryInfo, traceEnabled);
 
     // visit the plan and create query physical plan.
     ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 4403d35e6d..c9ce748101 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -43,8 +43,8 @@ public class ServerPlanRequestContext extends PlanRequestContext {
 
   public ServerPlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
       long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap,
-      PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
-    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, metadataMap);
+      PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
+    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, metadataMap, traceEnabled);
     _pinotQuery = pinotQuery;
     _tableType = tableType;
     _timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 011fed8472..6d113c9fa2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -85,13 +85,14 @@ public class QueryDispatcher {
 
   public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
       MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions,
-      Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
+      Map<Integer, ExecutionStatsAggregator> executionStatsAggregator, boolean traceEnabled)
       throws Exception {
     try {
       // submit all the distributed stages.
       int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
       // run reduce stage and return result.
-      return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator);
+      return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator,
+          traceEnabled);
     } catch (Exception e) {
       cancel(requestId, queryPlan);
       throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e);
@@ -175,13 +176,14 @@ public class QueryDispatcher {
 
   @VisibleForTesting
   public static ResultTable runReducer(long requestId, QueryPlan queryPlan, int reduceStageId, long timeoutMs,
-      MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
+      MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap,
+      boolean traceEnabled) {
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
     VirtualServerAddress server =
         new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0);
     OpChainExecutionContext context =
         new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, timeoutMs,
-            queryPlan.getStageMetadataMap());
+            queryPlan.getStageMetadataMap(), traceEnabled);
     MailboxReceiveOperator mailboxReceiveOperator =
         createReduceStageOperator(reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), context);
     List<DataBlock> resultDataBlocks =
@@ -221,12 +223,14 @@ public class QueryDispatcher {
             OperatorStats operatorStats = entry.getValue();
             ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
             ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
-            if (queryPlan != null) {
-              StageMetadata operatorStageMetadata = queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
-              OperatorUtils.recordTableName(operatorStats, operatorStageMetadata);
-            }
             rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
-            stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+            if (stageStatsAggregator != null) {
+              if (queryPlan != null) {
+                StageMetadata operatorStageMetadata = queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
+                OperatorUtils.recordTableName(operatorStats, operatorStageMetadata);
+              }
+              stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+            }
           }
         }
         return resultDataBlocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 649a2bee05..e1e2e17ab1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -196,7 +196,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
 
     try {
       QueryDispatcher.runReducer(requestId, queryPlan, reducerStageId,
-          Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService, null);
+          Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService, null,
+          false);
     } catch (RuntimeException rte) {
       Assert.assertTrue(rte.getMessage().contains("Received error query execution result block"));
       Assert.assertTrue(rte.getMessage().contains(exceptionMsg), "Exception should contain: " + exceptionMsg
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 3bd3156ce6..affc0317a0 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.math.DoubleMath;
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -88,10 +87,15 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
   protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
-    Map<String, String> requestMetadataMap =
-        ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId),
-            QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
-            String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+    Map<String, String> requestMetadataMap = new HashMap<>();
+    requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId));
+    requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
+        String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+
+    if (executionStatsAggregatorMap != null) {
+      requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
+    }
+
     int reducerStageId = -1;
     for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
       if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
@@ -110,7 +114,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     Preconditions.checkState(reducerStageId != -1);
     ResultTable resultTable = QueryDispatcher.runReducer(requestId, queryPlan, reducerStageId,
         Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService,
-        executionStatsAggregatorMap);
+        executionStatsAggregatorMap, true);
     return resultTable.getRows();
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index a30adf04f4..fbbab9e4d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
 
   private OpChain getChain(MultiStageOperator operator) {
     VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
-    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null);
+    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null, true);
     return new OpChain(context, operator, ImmutableList.of());
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 34b39a697b..1f8eb9e254 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -184,6 +184,6 @@ public class RoundRobinSchedulerTest {
 
   private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
     return new OpChainExecutionContext(null, requestId, stageId,
-        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null);
+        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null, true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 7e932067e1..cb67b22127 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -86,7 +86,7 @@ public class MailboxReceiveOperatorTest {
     // shorter timeoutMs should result in error.
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
-            new HashMap<>());
+            new HashMap<>(), false);
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
     Thread.sleep(200L);
@@ -97,13 +97,13 @@ public class MailboxReceiveOperatorTest {
 
     // longer timeout or default timeout (10s) doesn't result in error.
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
-        new HashMap<>());
+        new HashMap<>(), false);
     receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-        Long.MAX_VALUE, new HashMap<>());
+        Long.MAX_VALUE, new HashMap<>(), false);
     receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
@@ -125,7 +125,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
             456, 789, null);
@@ -144,7 +144,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
     MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
         RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
   }
@@ -172,7 +172,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -210,7 +210,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -251,7 +251,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -290,7 +290,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -332,7 +332,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -378,7 +378,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -424,7 +424,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -474,7 +474,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -526,7 +526,7 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -585,7 +585,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -632,7 +632,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 25b307b22b..fe363d9e8b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -194,6 +194,11 @@ public class MailboxSendOperatorTest {
     ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
     Mockito.verify(_exchange).send(captor.capture());
     Assert.assertSame(captor.getValue().getType(), DataBlock.Type.ROW, "expected data block to propagate");
+
+    // EOS block should contain statistics
+    Assert.assertFalse(context.getStats().getOperatorStatsMap().isEmpty());
+    Assert.assertEquals(context.getStats().getOperatorStatsMap().size(), 1);
+    Assert.assertTrue(context.getStats().getOperatorStatsMap().containsKey(operator.getOperatorId()));
   }
 
   private static TransferableBlock block(DataSchema schema, Object[]... rows) {
@@ -206,7 +211,7 @@ public class MailboxSendOperatorTest {
     Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(DEFAULT_RECEIVER_STAGE_ID, stageMetadata);
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_SENDER_STAGE_ID, new VirtualServerAddress(_server),
-            deadlineMs, deadlineMs, stageMetadataMap);
+            deadlineMs, deadlineMs, stageMetadataMap, false);
     return context;
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 6f83f9ac77..56ff2b36e3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -18,13 +18,34 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -39,9 +60,62 @@ public class OpChainTest {
   @Mock
   private MultiStageOperator _upstreamOperator;
 
+  private static int _numOperatorsInitialized = 0;
+  private final List<TransferableBlock> _blockList = new ArrayList<>();
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private VirtualServer _server;
+  @Mock
+  private KeySelector<Object[], Object[]> _selector;
+  @Mock
+  private MailboxSendOperator.BlockExchangeFactory _exchangeFactory;
+  @Mock
+  private BlockExchange _exchange;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService2;
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+
+    Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+        Mockito.anyLong())).thenReturn(_exchange);
+
+    Mockito.when(_server.getHostname()).thenReturn("mock");
+    Mockito.when(_server.getQueryMailboxPort()).thenReturn(0);
+    Mockito.when(_server.getVirtualId()).thenReturn(0);
+
+    Mockito.when(_mailboxService.getReceivingMailbox(Mockito.any())).thenReturn(_mailbox);
+    Mockito.when(_mailboxService2.getReceivingMailbox(Mockito.any())).thenReturn(_mailbox2);
+
+    try {
+      Mockito.doAnswer(invocation -> {
+        TransferableBlock arg = invocation.getArgument(0);
+        _blockList.add(arg);
+        return null;
+      }).when(_exchange).send(Mockito.any(TransferableBlock.class));
+
+      Mockito.when(_mailbox2.receive()).then(x -> {
+        if (_blockList.isEmpty()) {
+          return TransferableBlockUtils.getNoOpTransferableBlock();
+        }
+        return _blockList.remove(0);
+      });
+
+      Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @AfterMethod
@@ -79,7 +153,7 @@ public class OpChainTest {
   }
 
   @Test
-  public void testStatsCollection() {
+  public void testStatsCollectionTracingEnabled() {
     OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
     DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
 
@@ -100,7 +174,159 @@ public class OpChainTest {
         Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) <= 2000);
   }
 
+  @Test
+  public void testStatsCollectionTracingDisabled() {
+    OpChainExecutionContext context = OperatorTestUtil.getDefaultContextWithTracingDisabled();
+    DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
+
+    OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>());
+    opChain.getStats().executing();
+    opChain.getRoot().nextBlock();
+    opChain.getStats().queued();
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+    Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 0);
+  }
+
+  @Test
+  public void testStatsCollectionTracingEnabledMultipleOperators() {
+    long dummyOperatorWaitTime = 1000L;
+
+    int receivedStageId = 2;
+    int senderStageId = 1;
+    StageMetadata stageMetadata = new StageMetadata();
+    stageMetadata.setServerInstances(ImmutableList.of(_server));
+    Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(receivedStageId, stageMetadata);
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService, 1, senderStageId, new VirtualServerAddress(_server), 1000, 1000,
+            stageMetadataMap, true);
+
+    Stack<MultiStageOperator> operators =
+        getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+
+    OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>());
+    opChain.getStats().executing();
+    while (!opChain.getRoot().nextBlock().isEndOfStreamBlock()) {
+      // Drain the opchain
+    }
+    opChain.getStats().queued();
+
+    OpChainExecutionContext secondStageContext =
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, new VirtualServerAddress(_server), 1000,
+            1000, stageMetadataMap, true);
+
+    MailboxReceiveOperator secondStageReceiveOp =
+        new MailboxReceiveOperator(secondStageContext, ImmutableList.of(_server),
+            RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId, receivedStageId + 1, null);
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
+    int numOperators = operators.size();
+    Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), numOperators);
+    while (!operators.isEmpty()) {
+      Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operators.pop().getOperatorId()));
+    }
+
+    while (!secondStageReceiveOp.nextBlock().isEndOfStreamBlock()) {
+      // Drain the mailbox
+    }
+    Assert.assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), numOperators + 1);
+  }
+
+  @Test
+  public void testStatsCollectionTracingDisableMultipleOperators() {
+    long dummyOperatorWaitTime = 1000L;
+
+    int receivedStageId = 2;
+    int senderStageId = 1;
+    StageMetadata stageMetadata = new StageMetadata();
+    stageMetadata.setServerInstances(ImmutableList.of(_server));
+    Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(receivedStageId, stageMetadata);
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService, 1, senderStageId, new VirtualServerAddress(_server), 1000, 1000,
+            stageMetadataMap, false);
+
+    Stack<MultiStageOperator> operators =
+        getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+
+    OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>());
+    opChain.getStats().executing();
+    opChain.getRoot().nextBlock();
+    opChain.getStats().queued();
+
+    OpChainExecutionContext secondStageContext =
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, new VirtualServerAddress(_server), 1000,
+            1000, stageMetadataMap, false);
+
+    MailboxReceiveOperator secondStageReceiveOp =
+        new MailboxReceiveOperator(secondStageContext, ImmutableList.of(_server),
+            RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId, receivedStageId + 1, null);
+
+    Assert.assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
+    Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2);
+    Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operators.pop().getOperatorId()));
+
+    while (!secondStageReceiveOp.nextBlock().isEndOfStreamBlock()) {
+      // Drain the mailbox
+    }
+
+    while (!operators.isEmpty()) {
+      MultiStageOperator operator = operators.pop();
+      if (operator.toExplainString().contains("SEND") || operator.toExplainString().contains("LEAF")) {
+        Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operator.getOperatorId()));
+      }
+    }
+    Assert.assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), 2);
+  }
+
+  private Stack<MultiStageOperator> getFullOpchain(int receivedStageId, int senderStageId,
+      OpChainExecutionContext context, long waitTimeInMillis) {
+    Stack<MultiStageOperator> operators = new Stack<>();
+    DataSchema upStreamSchema =
+        new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    //Mailbox Receive Operator
+    try {
+      Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1}),
+          TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    } catch (Exception e) {
+      Assert.fail("Exception while mocking mailbox receive: " + e.getMessage());
+    }
+
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
+    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
+        new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
+    LeafStageTransferableBlockOperator leafOp =
+        new LeafStageTransferableBlockOperator(context, resultsBlockList, upStreamSchema);
+
+    //Transform operator
+    RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+    TransformOperator transformOp =
+        new TransformOperator(context, leafOp, upStreamSchema, ImmutableList.of(ref0), upStreamSchema);
+
+    //Filter operator
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    FilterOperator filterOp = new FilterOperator(context, transformOp, upStreamSchema, booleanLiteral);
+
+    // Dummy operator
+    MultiStageOperator dummyWaitOperator = new DummyMultiStageCallableOperator(context, filterOp, waitTimeInMillis);
+
+    //Mailbox Send operator
+    MailboxSendOperator sendOperator =
+        new MailboxSendOperator(context, dummyWaitOperator, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null,
+            null, false,
+            server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", senderStageId, receivedStageId),
+            _exchangeFactory, receivedStageId);
+
+    operators.push(leafOp);
+    operators.push(transformOp);
+    operators.push(filterOp);
+    operators.push(dummyWaitOperator);
+    operators.push(sendOperator);
+    return operators;
+  }
+
+
   static class DummyMultiStageOperator extends MultiStageOperator {
+
     public DummyMultiStageOperator(OpChainExecutionContext context) {
       super(context);
     }
@@ -121,4 +347,33 @@ public class OpChainTest {
       return "DUMMY";
     }
   }
+
+  static class DummyMultiStageCallableOperator extends MultiStageOperator {
+    private final MultiStageOperator _upstream;
+    private final long _sleepTimeInMillis;
+
+    public DummyMultiStageCallableOperator(OpChainExecutionContext context, MultiStageOperator upstream,
+        long sleepTimeInMillis) {
+      super(context);
+      _upstream = upstream;
+      _sleepTimeInMillis = sleepTimeInMillis;
+    }
+
+    @Override
+    protected TransferableBlock getNextBlock() {
+      try {
+        Thread.sleep(_sleepTimeInMillis);
+        _upstream.nextBlock();
+      } catch (InterruptedException e) {
+        // IGNORE
+      }
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+
+    @Nullable
+    @Override
+    public String toExplainString() {
+      return "DUMMY_" + _numOperatorsInitialized++;
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index cd1f3ffe8d..575df48d30 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -65,12 +65,18 @@ public class OperatorTestUtil {
   public static OpChainExecutionContext getDefaultContext() {
     VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
     return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
-        new HashMap<>());
+        new HashMap<>(), true);
+  }
+
+  public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
+    VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
+        new HashMap<>(), false);
   }
 
   public static OpChainExecutionContext getContext(long requestId, int stageId,
       VirtualServerAddress virtualServerAddress) {
     return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
-        new HashMap<>());
+        new HashMap<>(), true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 659f7e8860..e306875cf3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -97,7 +97,7 @@ public class SortedMailboxReceiveOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
-            new HashMap<>());
+            new HashMap<>(), false);
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
             collationDirections, false, true, inSchema, 456, 789, 10L);
@@ -109,14 +109,14 @@ public class SortedMailboxReceiveOperatorTest {
 
     // longer timeout or default timeout (10s) doesn't result in error.
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
-        new HashMap<>());
+        new HashMap<>(), false);
     receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
         collationKeys, collationDirections, false, true, inSchema, 456, 789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-        Long.MAX_VALUE, new HashMap<>());
+        Long.MAX_VALUE, new HashMap<>(), false);
     receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
         collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
     Thread.sleep(200L);
@@ -146,7 +146,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
             collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
@@ -172,7 +172,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
     SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         ImmutableList.of(_server1, _server2), RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys,
         collationDirections, false, true, inSchema, 456, 789, null);
@@ -208,7 +208,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -254,7 +254,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -303,7 +303,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -350,7 +350,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -398,7 +398,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -450,7 +450,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -489,7 +489,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -526,7 +526,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -574,7 +574,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -630,7 +630,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -686,7 +686,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -739,7 +739,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp =
         new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -798,7 +798,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
@@ -882,7 +882,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
+            Long.MAX_VALUE, new HashMap<>(), false);
 
     SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 4911f3b57f..614f983c66 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -246,6 +246,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), numSegments);
 
       Map<Integer, BrokerResponseStats> stageIdStats = brokerResponseNative.getStageIdStats();
+      int numTables = 0;
       for (Integer stageId : stageIdStats.keySet()) {
         // check stats only for leaf stage
         BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
@@ -256,7 +257,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
 
         String tableName = brokerResponseStats.getTableNames().get(0);
         Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
-
+        numTables++;
         TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
         if (tableType == null) {
           tableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
@@ -275,6 +276,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
           }
         }
       }
+
+      Assert.assertTrue(numTables > 0);
     });
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 9adf5fe627..af98b32d12 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -138,7 +138,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     QueryDispatcher dispatcher = new QueryDispatcher();
     long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
     try {
-      dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null);
+      dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null, false);
       Assert.fail("Method call above should have failed");
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Error executing query"));
@@ -161,7 +161,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
     try {
       // will throw b/c mailboxService is null
-      dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null);
+      dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null, false);
       Assert.fail("Method call above should have failed");
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Error executing query"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org