You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/02/22 04:50:28 UTC

[pinot] branch master updated: Remove duplicate stats aggregator from V2 query metrics (#10299)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0722bf077b Remove duplicate stats aggregator from V2 query metrics (#10299)
0722bf077b is described below

commit 0722bf077b2d23783ea463a4f615847057e7dbbf
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Feb 22 10:20:22 2023 +0530

    Remove duplicate stats aggregator from V2 query metrics (#10299)
    
    * Remove duplicate stats aggregator from V2 query metrics
    
    * Add multistage specific operator stats to metadata
    
    * add test for metadata
    Add tests for metadata, refactor the Operator interfaces to accept serverId
    
    * Rename wallTime to executionTime for correct nomenclature in metrics
    
    * Add serverAddress as a seperate param in operatorStats as well
    
    * Remove redundant constructor without server addresses
    
    * Merge metadata test into resource based queries test
    
    * fix broken SortOperatorTest
    
    ---------
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../MultiStageBrokerRequestHandler.java            |  53 +---
 .../pinot/core/query/reduce/BaseReduceService.java | 249 ------------------
 .../query/reduce/ExecutionStatsAggregator.java     | 289 +++++++++++++++++++++
 .../query/reduce/StreamingReduceServiceTest.java   |   4 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |   5 +-
 .../query/runtime/operator/AggregateOperator.java  |  11 +-
 .../query/runtime/operator/FilterOperator.java     |   5 +-
 .../query/runtime/operator/HashJoinOperator.java   |   5 +-
 .../LeafStageTransferableBlockOperator.java        |   7 +-
 .../runtime/operator/LiteralValueOperator.java     |   5 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   3 +-
 .../runtime/operator/MailboxSendOperator.java      |   6 +-
 .../query/runtime/operator/MultiStageOperator.java |  13 +-
 .../query/runtime/operator/OperatorStats.java      |  34 ++-
 .../pinot/query/runtime/operator/SortOperator.java |   9 +-
 .../query/runtime/operator/TransformOperator.java  |   7 +-
 .../runtime/operator/utils/OperatorUtils.java      |  11 +-
 .../runtime/operator/utils/StatsAggregator.java    | 170 ------------
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  16 +-
 .../pinot/query/service/QueryDispatcher.java       |  48 +---
 .../pinot/query/runtime/QueryRunnerTest.java       |   4 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |   6 +-
 .../runtime/operator/AggregateOperatorTest.java    |  24 +-
 .../query/runtime/operator/FilterOperatorTest.java |  33 ++-
 .../runtime/operator/HashJoinOperatorTest.java     |  46 ++--
 .../LeafStageTransferableBlockOperatorTest.java    |  48 +++-
 .../runtime/operator/LiteralValueOperatorTest.java |  19 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |  56 ++--
 .../query/runtime/operator/SortOperatorTest.java   |  68 +++--
 .../runtime/operator/TransformOperatorTest.java    |  24 +-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  93 ++++++-
 31 files changed, 666 insertions(+), 705 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 7e21bab7ab..63c1e8f9ad 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,7 +20,6 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -32,7 +31,6 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.catalog.PinotCatalog;
@@ -167,10 +166,10 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     }
 
     ResultTable queryResults;
-    Map<String, String> metadata = new HashMap<>();
+    ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false);
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs,
-          sqlNodeAndOptions.getOptions(), metadata);
+          sqlNodeAndOptions.getOptions(), executionStatsAggregator);
     } catch (Exception e) {
       LOGGER.info("query execution failed", e);
       return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
@@ -185,57 +184,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
 
-    attachMetadataToResponse(metadata, brokerResponse);
+    executionStatsAggregator.setStats(brokerResponse);
 
     requestContext.setQueryProcessingTime(totalTimeMs);
     augmentStatistics(requestContext, brokerResponse);
     return brokerResponse;
   }
 
-  //TODO: Remove this duplicate method, use the implementation from V1 engine
-  private void attachMetadataToResponse(Map<String, String> stats, BrokerResponseNative brokerResponse) {
-    brokerResponse.setNumDocsScanned(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName(), "0")));
-    brokerResponse.setNumEntriesScannedInFilter(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0")));
-    brokerResponse.setNumEntriesScannedPostFilter(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0")));
-    brokerResponse.setTotalDocs(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.TOTAL_DOCS.getName(), "0")));
-    brokerResponse.setNumSegmentsQueried(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName(), "0")));
-    brokerResponse.setNumSegmentsProcessed(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0")));
-    brokerResponse.setNumSegmentsMatched(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0")));
-    brokerResponse.setNumConsumingSegmentsQueried(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), "0")));
-
-    brokerResponse.setNumSegmentsPrunedByServer(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), "0")));
-    brokerResponse.setNumSegmentsPrunedInvalid(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), "0")));
-    brokerResponse.setNumSegmentsPrunedByLimit(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), "0")));
-    brokerResponse.setNumSegmentsPrunedByValue(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), "0")));
-    brokerResponse.setExplainPlanNumEmptyFilterSegments(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
-            "0")));
-    brokerResponse.setExplainPlanNumMatchAllFilterSegments(Long.parseLong(
-        stats.getOrDefault(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(), "0")));
-    brokerResponse.setNumConsumingSegmentsQueried(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), "0")));
-    brokerResponse.setMinConsumingFreshnessTimeMs(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), "0")));
-    brokerResponse.setNumConsumingSegmentsProcessed(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), "0")));
-    brokerResponse.setNumConsumingSegmentsMatched(
-        Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0")));
-    brokerResponse.setNumGroupsLimitReached(
-        Boolean.parseBoolean(stats.getOrDefault(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "0")));
-  }
-
   private BrokerResponseNative constructMultistageExplainPlan(String sql, String plan) {
     BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
     List<Object[]> rows = new ArrayList<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 9efac85c93..6de874dbda 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -19,28 +19,15 @@
 package org.apache.pinot.core.query.reduce;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.function.LongConsumer;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTable.MetadataKey;
-import org.apache.pinot.common.metrics.BrokerMeter;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -120,240 +107,4 @@ public abstract class BaseReduceService {
   protected void shutDown() {
     _reduceExecutorService.shutdownNow();
   }
-
-  protected static class ExecutionStatsAggregator {
-    private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
-    private final Map<String, String> _traceInfo = new HashMap<>();
-    private final boolean _enableTrace;
-
-    private long _numDocsScanned = 0L;
-    private long _numEntriesScannedInFilter = 0L;
-    private long _numEntriesScannedPostFilter = 0L;
-    private long _numSegmentsQueried = 0L;
-    private long _numSegmentsProcessed = 0L;
-    private long _numSegmentsMatched = 0L;
-    private long _numConsumingSegmentsQueried = 0L;
-    private long _numConsumingSegmentsProcessed = 0L;
-    private long _numConsumingSegmentsMatched = 0L;
-    private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
-    private long _numTotalDocs = 0L;
-    private long _offlineThreadCpuTimeNs = 0L;
-    private long _realtimeThreadCpuTimeNs = 0L;
-    private long _offlineSystemActivitiesCpuTimeNs = 0L;
-    private long _realtimeSystemActivitiesCpuTimeNs = 0L;
-    private long _offlineResponseSerializationCpuTimeNs = 0L;
-    private long _realtimeResponseSerializationCpuTimeNs = 0L;
-    private long _offlineTotalCpuTimeNs = 0L;
-    private long _realtimeTotalCpuTimeNs = 0L;
-    private long _numSegmentsPrunedByServer = 0L;
-    private long _numSegmentsPrunedInvalid = 0L;
-    private long _numSegmentsPrunedByLimit = 0L;
-    private long _numSegmentsPrunedByValue = 0L;
-    private long _explainPlanNumEmptyFilterSegments = 0L;
-    private long _explainPlanNumMatchAllFilterSegments = 0L;
-    private boolean _numGroupsLimitReached = false;
-
-    protected ExecutionStatsAggregator(boolean enableTrace) {
-      _enableTrace = enableTrace;
-    }
-
-    protected synchronized void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
-      Map<String, String> metadata = dataTable.getMetadata();
-      // Reduce on trace info.
-      if (_enableTrace) {
-        _traceInfo.put(routingInstance.getShortName(), metadata.get(MetadataKey.TRACE_INFO.getName()));
-      }
-
-      // Reduce on exceptions.
-      Map<Integer, String> exceptions = dataTable.getExceptions();
-      for (int key : exceptions.keySet()) {
-        _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
-      }
-
-      // Reduce on execution statistics.
-      String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
-      if (numDocsScannedString != null) {
-        _numDocsScanned += Long.parseLong(numDocsScannedString);
-      }
-      String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
-      if (numEntriesScannedInFilterString != null) {
-        _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
-      }
-      String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
-      if (numEntriesScannedPostFilterString != null) {
-        _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
-      }
-      String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
-      if (numSegmentsQueriedString != null) {
-        _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
-      }
-
-      String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
-      if (numSegmentsProcessedString != null) {
-        _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
-      }
-      String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
-      if (numSegmentsMatchedString != null) {
-        _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
-      }
-
-      String numConsumingSegmentsQueriedString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
-      if (numConsumingSegmentsQueriedString != null) {
-        _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
-      }
-
-      String numConsumingSegmentsProcessed = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
-      if (numConsumingSegmentsProcessed != null) {
-        _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
-      }
-
-      String numConsumingSegmentsMatched = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
-      if (numConsumingSegmentsMatched != null) {
-        _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
-      }
-
-      String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
-      if (minConsumingFreshnessTimeMsString != null) {
-        _minConsumingFreshnessTimeMs =
-            Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
-      }
-
-      String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
-      if (threadCpuTimeNsString != null) {
-        if (routingInstance.getTableType() == TableType.OFFLINE) {
-          _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
-        } else {
-          _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
-        }
-      }
-
-      String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
-      if (systemActivitiesCpuTimeNsString != null) {
-        if (routingInstance.getTableType() == TableType.OFFLINE) {
-          _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
-        } else {
-          _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
-        }
-      }
-
-      String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-      if (responseSerializationCpuTimeNsString != null) {
-        if (routingInstance.getTableType() == TableType.OFFLINE) {
-          _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
-        } else {
-          _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
-        }
-      }
-      _offlineTotalCpuTimeNs =
-          _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
-      _realtimeTotalCpuTimeNs =
-          _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
-
-      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
-          l -> _numSegmentsPrunedByServer += l);
-      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, l -> _numSegmentsPrunedInvalid += l);
-      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, l -> _numSegmentsPrunedByLimit += l);
-      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, l -> _numSegmentsPrunedByValue += l);
-
-      String explainPlanNumEmptyFilterSegments =
-          metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
-      if (explainPlanNumEmptyFilterSegments != null) {
-        _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
-      }
-
-      String explainPlanNumMatchAllFilterSegments =
-          metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
-      if (explainPlanNumMatchAllFilterSegments != null) {
-        _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
-      }
-
-      String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
-      if (numTotalDocsString != null) {
-        _numTotalDocs += Long.parseLong(numTotalDocsString);
-      }
-      _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
-    }
-
-    protected void setStats(String rawTableName, BrokerResponseNative brokerResponseNative,
-        BrokerMetrics brokerMetrics) {
-      // set exception
-      List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
-      processingExceptions.addAll(_processingExceptions);
-
-      // add all trace.
-      if (_enableTrace) {
-        brokerResponseNative.getTraceInfo().putAll(_traceInfo);
-      }
-
-      // Set execution statistics.
-      brokerResponseNative.setNumDocsScanned(_numDocsScanned);
-      brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
-      brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
-      brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
-      brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
-      brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
-      brokerResponseNative.setTotalDocs(_numTotalDocs);
-      brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
-      brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
-      brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
-      brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
-      brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
-      brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
-      brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
-      brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
-      brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
-      brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
-      brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
-      brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
-      brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
-      brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
-      brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
-      if (_numConsumingSegmentsQueried > 0) {
-        brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
-      }
-      if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
-        brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
-      }
-      brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
-      brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);
-
-      // Update broker metrics.
-      if (brokerMetrics != null) {
-        brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
-        brokerMetrics
-            .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, _numEntriesScannedInFilter);
-        brokerMetrics
-            .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, _numEntriesScannedPostFilter);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS,
-            _realtimeThreadCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
-            _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
-            _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
-            _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
-            _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-
-        if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
-          brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
-              System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
-        }
-      }
-    }
-
-    private void withNotNullLongMetadata(Map<String, String> metadata, MetadataKey key, LongConsumer consumer) {
-      String strValue = metadata.get(key.getName());
-      if (strValue != null) {
-        consumer.accept(Long.parseLong(strValue));
-      }
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
new file mode 100644
index 0000000000..0c93252c51
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+
+
+public class ExecutionStatsAggregator {
+  private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
+  private final Map<String, String> _traceInfo = new HashMap<>();
+  private final boolean _enableTrace;
+
+  private long _numDocsScanned = 0L;
+  private long _numEntriesScannedInFilter = 0L;
+  private long _numEntriesScannedPostFilter = 0L;
+  private long _numSegmentsQueried = 0L;
+  private long _numSegmentsProcessed = 0L;
+  private long _numSegmentsMatched = 0L;
+  private long _numConsumingSegmentsQueried = 0L;
+  private long _numConsumingSegmentsProcessed = 0L;
+  private long _numConsumingSegmentsMatched = 0L;
+  private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+  private long _numTotalDocs = 0L;
+  private long _offlineThreadCpuTimeNs = 0L;
+  private long _realtimeThreadCpuTimeNs = 0L;
+  private long _offlineSystemActivitiesCpuTimeNs = 0L;
+  private long _realtimeSystemActivitiesCpuTimeNs = 0L;
+  private long _offlineResponseSerializationCpuTimeNs = 0L;
+  private long _realtimeResponseSerializationCpuTimeNs = 0L;
+  private long _offlineTotalCpuTimeNs = 0L;
+  private long _realtimeTotalCpuTimeNs = 0L;
+  private long _numSegmentsPrunedByServer = 0L;
+  private long _numSegmentsPrunedInvalid = 0L;
+  private long _numSegmentsPrunedByLimit = 0L;
+  private long _numSegmentsPrunedByValue = 0L;
+  private long _explainPlanNumEmptyFilterSegments = 0L;
+  private long _explainPlanNumMatchAllFilterSegments = 0L;
+  private boolean _numGroupsLimitReached = false;
+
+  public ExecutionStatsAggregator(boolean enableTrace) {
+    _enableTrace = enableTrace;
+  }
+
+  public void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
+    aggregate(routingInstance, dataTable.getMetadata(), dataTable.getExceptions());
+  }
+
+  public synchronized void aggregate(@Nullable ServerRoutingInstance routingInstance, Map<String, String> metadata,
+      Map<Integer, String> exceptions) {
+    // Reduce on trace info.
+    if (_enableTrace) {
+      _traceInfo.put(routingInstance.getShortName(), metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
+    }
+
+    // Reduce on exceptions.
+    for (int key : exceptions.keySet()) {
+      _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
+    }
+
+    // Reduce on execution statistics.
+    String numDocsScannedString = metadata.get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName());
+    if (numDocsScannedString != null) {
+      _numDocsScanned += Long.parseLong(numDocsScannedString);
+    }
+    String numEntriesScannedInFilterString =
+        metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
+    if (numEntriesScannedInFilterString != null) {
+      _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
+    }
+    String numEntriesScannedPostFilterString =
+        metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
+    if (numEntriesScannedPostFilterString != null) {
+      _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
+    }
+    String numSegmentsQueriedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName());
+    if (numSegmentsQueriedString != null) {
+      _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
+    }
+
+    String numSegmentsProcessedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
+    if (numSegmentsProcessedString != null) {
+      _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
+    }
+    String numSegmentsMatchedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName());
+    if (numSegmentsMatchedString != null) {
+      _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
+    }
+
+    String numConsumingSegmentsQueriedString =
+        metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
+    if (numConsumingSegmentsQueriedString != null) {
+      _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
+    }
+
+    String numConsumingSegmentsProcessed =
+        metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
+    if (numConsumingSegmentsProcessed != null) {
+      _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
+    }
+
+    String numConsumingSegmentsMatched = metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
+    if (numConsumingSegmentsMatched != null) {
+      _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
+    }
+
+    String minConsumingFreshnessTimeMsString =
+        metadata.get(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
+    if (minConsumingFreshnessTimeMsString != null) {
+      _minConsumingFreshnessTimeMs =
+          Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
+    }
+
+    String threadCpuTimeNsString = metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
+    if (routingInstance != null && threadCpuTimeNsString != null) {
+      if (routingInstance.getTableType() == TableType.OFFLINE) {
+        _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+      } else {
+        _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+      }
+    }
+
+    String systemActivitiesCpuTimeNsString =
+        metadata.get(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
+    if (routingInstance != null && systemActivitiesCpuTimeNsString != null) {
+      if (routingInstance.getTableType() == TableType.OFFLINE) {
+        _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+      } else {
+        _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+      }
+    }
+
+    String responseSerializationCpuTimeNsString =
+        metadata.get(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+    if (routingInstance != null && responseSerializationCpuTimeNsString != null) {
+      if (routingInstance.getTableType() == TableType.OFFLINE) {
+        _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+      } else {
+        _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+      }
+    }
+    _offlineTotalCpuTimeNs =
+        _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
+    _realtimeTotalCpuTimeNs =
+        _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
+
+    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+        l -> _numSegmentsPrunedByServer += l);
+    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
+        l -> _numSegmentsPrunedInvalid += l);
+    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+        l -> _numSegmentsPrunedByLimit += l);
+    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
+        l -> _numSegmentsPrunedByValue += l);
+
+    String explainPlanNumEmptyFilterSegments =
+        metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
+    if (explainPlanNumEmptyFilterSegments != null) {
+      _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
+    }
+
+    String explainPlanNumMatchAllFilterSegments =
+        metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
+    if (explainPlanNumMatchAllFilterSegments != null) {
+      _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
+    }
+
+    String numTotalDocsString = metadata.get(DataTable.MetadataKey.TOTAL_DOCS.getName());
+    if (numTotalDocsString != null) {
+      _numTotalDocs += Long.parseLong(numTotalDocsString);
+    }
+    _numGroupsLimitReached |=
+        Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+  }
+
+  public void setStats(BrokerResponseNative brokerResponseNative) {
+    setStats(null, brokerResponseNative, null);
+  }
+
+  public void setStats(@Nullable String rawTableName, BrokerResponseNative brokerResponseNative,
+      @Nullable BrokerMetrics brokerMetrics) {
+    // set exception
+    List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
+    processingExceptions.addAll(_processingExceptions);
+
+    // add all trace.
+    if (_enableTrace) {
+      brokerResponseNative.getTraceInfo().putAll(_traceInfo);
+    }
+
+    // Set execution statistics.
+    brokerResponseNative.setNumDocsScanned(_numDocsScanned);
+    brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
+    brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
+    brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
+    brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
+    brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
+    brokerResponseNative.setTotalDocs(_numTotalDocs);
+    brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+    brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
+    brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
+    brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
+    brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
+    brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
+    brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
+    brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
+    brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
+    brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
+    brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
+    brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
+    brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
+    brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
+    brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
+    if (_numConsumingSegmentsQueried > 0) {
+      brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
+    }
+    if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+      brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
+    }
+    brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
+    brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);
+
+    // Update broker metrics.
+    if (brokerMetrics != null && rawTableName != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER,
+          _numEntriesScannedInFilter);
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER,
+          _numEntriesScannedPostFilter);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, _realtimeThreadCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+          _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+          _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
+          _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
+          _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+
+      if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+            System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
+    String strValue = metadata.get(key.getName());
+    if (strValue != null) {
+      consumer.accept(Long.parseLong(strValue));
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
index 9946ddfa15..a48b12563d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
@@ -56,7 +56,7 @@ public class StreamingReduceServiceTest {
               threadPoolService,
               ImmutableMap.of(routingInstance, mockedResponse),
               1000,
-              mock(BaseReduceService.ExecutionStatsAggregator.class));
+              mock(ExecutionStatsAggregator.class));
           return null;
         }, cause -> cause.getMessage().contains(exceptionMessage))
     );
@@ -85,7 +85,7 @@ public class StreamingReduceServiceTest {
               threadPoolService,
               ImmutableMap.of(routingInstance, mockedResponse),
               10,
-              mock(BaseReduceService.ExecutionStatsAggregator.class));
+              mock(ExecutionStatsAggregator.class));
           return null;
         },
         (cause) -> cause instanceof TimeoutException));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d171336a52..3846e4d424 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -154,8 +154,9 @@ public class QueryRunner {
       StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
       MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService,
           new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema(), requestId,
-              sendNode.getStageId()), receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
-          sendNode.getPartitionKeySelector(), _rootServer, serverQueryRequests.get(0).getRequestId(),
+              sendNode.getStageId(), _rootServer), receivingStageMetadata.getServerInstances(),
+          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _rootServer,
+          serverQueryRequests.get(0).getRequestId(),
           sendNode.getStageId(), sendNode.getReceiverStageId());
       int blockCounter = 0;
       while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 5b1e1e1de9..6b3babca82 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
@@ -81,16 +82,18 @@ public class AggregateOperator extends MultiStageOperator {
   // groupSet has to be a list of InputRef and cannot be null
   // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
   public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
-      List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId) {
+      List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId,
+      VirtualServerAddress virtualServerAddress) {
     this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.Accumulator.MERGERS, requestId,
-        stageId);
+        stageId, virtualServerAddress);
   }
 
   @VisibleForTesting
   AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
       List<RexExpression> groupSet, DataSchema inputSchema,
-      Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId) {
-    super(requestId, stageId);
+      Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId,
+      VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     _inputOperator = inputOperator;
     _groupSet = groupSet;
     _upstreamErrorBlock = null;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index ba6f64f19e..0b819fdd46 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -55,8 +56,8 @@ public class FilterOperator extends MultiStageOperator {
   private TransferableBlock _upstreamErrorBlock;
 
   public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter,
-      long requestId, int stageId) {
-    super(requestId, stageId);
+      long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
     _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 1c27281a96..a813f35db3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -88,8 +89,8 @@ public class HashJoinOperator extends MultiStageOperator {
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
   public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator,
-      DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
-    super(requestId, stageId);
+      DataSchema leftSchema, JoinNode node, long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
         "Join type: " + node.getJoinRelType() + " is not supported!");
     _joinType = node.getJoinRelType();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 1e11b51953..d2b9c4db16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,8 +68,8 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   private int _currentIndex;
 
   public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema,
-      long requestId, int stageId) {
-    super(requestId, stageId);
+      long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     _baseResultBlock = baseResultBlock;
     _desiredDataSchema = dataSchema;
     _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
@@ -86,7 +87,7 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
-    return EXPLAIN_NAME;
+      return EXPLAIN_NAME;
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 6e123dd6b1..36d3fe511f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
@@ -40,8 +41,8 @@ public class LiteralValueOperator extends MultiStageOperator {
   private boolean _isLiteralBlockReturned;
 
   public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows,
-      long requestId, int stageId) {
-    super(requestId, stageId);
+      long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     _dataSchema = dataSchema;
     _rexLiteralBlock = constructBlock(rexLiteralRows);
     _isLiteralBlockReturned = false;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 3062b8cd38..0b16a47229 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -80,7 +80,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
       List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
       long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(jobId, senderStageId);
+    super(jobId, senderStageId, receiver);
     _mailboxService = mailboxService;
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -164,7 +164,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
               return block;
             } else {
               if (!block.getResultMetadata().isEmpty()) {
-                _operatorStats.clearExecutionStats();
                 _operatorStatsMap.putAll(block.getResultMetadata());
               }
               eosMailboxCount++;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e26c46f850..504a0d997b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -73,7 +73,7 @@ public class MailboxSendOperator extends MultiStageOperator {
       VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId) {
     this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
         server -> toMailboxId(server, jobId, senderStageId, receiverStageId, sendingServer), BlockExchange::getExchange,
-        jobId, senderStageId, receiverStageId);
+        jobId, senderStageId, receiverStageId, sendingServer);
   }
 
   @VisibleForTesting
@@ -81,8 +81,8 @@ public class MailboxSendOperator extends MultiStageOperator {
       MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int senderStageId,
-      int receiverStageId) {
-    super(jobId, senderStageId);
+      int receiverStageId, VirtualServerAddress serverAddress) {
+    super(jobId, senderStageId, serverAddress);
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
 
     List<MailboxIdentifier> receivingMailboxes;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index b55b42145a..d9c0104d91 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
@@ -38,14 +39,18 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
   // TODO: Move to OperatorContext class.
   protected final long _requestId;
   protected final int _stageId;
+  protected final VirtualServerAddress _serverAddress;
   protected final OperatorStats _operatorStats;
   protected final Map<String, OperatorStats> _operatorStatsMap;
+  private final String _operatorId;
 
-  public MultiStageOperator(long requestId, int stageId) {
+  public MultiStageOperator(long requestId, int stageId, VirtualServerAddress serverAddress) {
     _requestId = requestId;
     _stageId = stageId;
-    _operatorStats = new OperatorStats(requestId, stageId, toExplainString());
+    _operatorStats = new OperatorStats(requestId, stageId, serverAddress, toExplainString());
+    _serverAddress = serverAddress;
     _operatorStatsMap = new HashMap<>();
+    _operatorId = Joiner.on("_").join(toExplainString(), _requestId, _stageId, _serverAddress);
   }
 
   public Map<String, OperatorStats> getOperatorStatsMap() {
@@ -62,7 +67,6 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
       TransferableBlock nextBlock = getNextBlock();
       _operatorStats.recordRow(1, nextBlock.getNumRows());
       _operatorStats.endTimer();
-      // TODO: move this to centralized reporting in broker
       if (nextBlock.isEndOfStreamBlock()) {
         if (nextBlock.isSuccessfulEndOfStreamBlock()) {
           for (MultiStageOperator op : getChildOperators()) {
@@ -70,8 +74,7 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
           }
 
           if (!_operatorStats.getExecutionStats().isEmpty()) {
-            String operatorId = Joiner.on("_").join(toExplainString(), _requestId, _stageId);
-            _operatorStatsMap.put(operatorId, _operatorStats);
+            _operatorStatsMap.put(_operatorId, _operatorStats);
           }
           return TransferableBlockUtils.getEndOfStreamTransferableBlock(
               OperatorUtils.getMetadataFromOperatorStats(_operatorStatsMap));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 89456ecf9c..09a8d7ac60 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -19,9 +19,12 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Stopwatch;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.query.runtime.operator.utils.StatsAggregator;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
+
 
 public class OperatorStats {
   private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
@@ -29,18 +32,20 @@ public class OperatorStats {
   // TODO: add a operatorId for better tracking purpose.
   private final int _stageId;
   private final long _requestId;
+  private final VirtualServerAddress _serverAddress;
 
   private final String _operatorType;
 
   private int _numBlock = 0;
   private int _numRows = 0;
-  private StatsAggregator _statsAggregator;
+  private Map<String, String> _executionStats;
 
-  public OperatorStats(long requestId, int stageId, String operatorType) {
+  public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) {
     _stageId = stageId;
     _requestId = requestId;
+    _serverAddress = serverAddress;
     _operatorType = operatorType;
-    _statsAggregator = new StatsAggregator();
+    _executionStats = new HashMap<>();
   }
 
   public void startTimer() {
@@ -61,11 +66,15 @@ public class OperatorStats {
   }
 
   public void recordExecutionStats(Map<String, String> executionStats) {
-    _statsAggregator.aggregate(executionStats);
+    _executionStats = executionStats;
   }
 
   public Map<String, String> getExecutionStats() {
-    return _statsAggregator.getStats();
+    _executionStats.put(OperatorUtils.NUM_BLOCKS, String.valueOf(_numBlock));
+    _executionStats.put(OperatorUtils.NUM_ROWS, String.valueOf(_numRows));
+    _executionStats.put(OperatorUtils.THREAD_EXECUTION_TIME,
+        String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
+    return _executionStats;
   }
 
   public int getStageId() {
@@ -76,19 +85,16 @@ public class OperatorStats {
     return _requestId;
   }
 
-  public String getOperatorType() {
-    return _operatorType;
+  public VirtualServerAddress getServerAddress() {
+    return _serverAddress;
   }
 
-  public void clearExecutionStats() {
-    _statsAggregator = new StatsAggregator();
+  public String getOperatorType() {
+    return _operatorType;
   }
 
-  // TODO: Return the string as a JSON string.
   @Override
   public String toString() {
-    return String.format(
-        "OperatorStats[requestId: %s, stageId %s, type: %s] ExecutionWallTime: %sms, No. Rows: %s, No. Block: %s",
-        _requestId, _stageId, _operatorType, _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numRows, _numBlock);
+    return OperatorUtils.operatorStatsToJson(this);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index e2ce8f6499..2298dbb4c4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
@@ -55,16 +56,16 @@ public class SortOperator extends MultiStageOperator {
 
   public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
-      long requestId, int stageId) {
+      long requestId, int stageId, VirtualServerAddress serverAddress) {
     this(upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema,
-        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId);
+        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId, serverAddress);
   }
 
   @VisibleForTesting
   SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
-      int defaultHolderCapacity, long requestId, int stageId) {
-    super(requestId, stageId);
+      int defaultHolderCapacity, long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
     _offset = Math.max(offset, 0);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 691aed44fe..03f54ee6f4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -54,9 +55,9 @@ public class TransformOperator extends MultiStageOperator {
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema,
-      List<RexExpression> transforms, DataSchema upstreamDataSchema, long requestId, int stageId) {
-    super(requestId, stageId);
+  public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema, List<RexExpression> transforms,
+      DataSchema upstreamDataSchema, long requestId, int stageId, VirtualServerAddress serverAddress) {
+    super(requestId, stageId, serverAddress);
     Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty.");
     Preconditions.checkState(resultSchema.size() == transforms.size(),
         "result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index a20fd658d1..532befd702 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
@@ -31,6 +32,10 @@ import org.slf4j.LoggerFactory;
 
 
 public class OperatorUtils {
+  public static final String NUM_BLOCKS = "numBlocks";
+  public static final String NUM_ROWS = "numRows";
+  public static final String THREAD_EXECUTION_TIME = "threadExecutionTime";
+
   private static final Logger LOGGER = LoggerFactory.getLogger(OperatorUtils.class);
   private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>();
 
@@ -69,6 +74,7 @@ public class OperatorUtils {
       Map<String, Object> jsonOut = new HashMap<>();
       jsonOut.put("requestId", operatorStats.getRequestId());
       jsonOut.put("stageId", operatorStats.getStageId());
+      jsonOut.put("serverAddress", operatorStats.getServerAddress().toString());
       jsonOut.put("operatorType", operatorStats.getOperatorType());
       jsonOut.put("executionStats", operatorStats.getExecutionStats());
       return JsonUtils.objectToString(jsonOut);
@@ -83,9 +89,12 @@ public class OperatorUtils {
       JsonNode operatorStatsNode = JsonUtils.stringToJsonNode(json);
       long requestId = operatorStatsNode.get("requestId").asLong();
       int stageId = operatorStatsNode.get("stageId").asInt();
+      String serverAddressStr = operatorStatsNode.get("serverAddress").asText();
+      VirtualServerAddress serverAddress = VirtualServerAddress.parse(serverAddressStr);
       String operatorType = operatorStatsNode.get("operatorType").asText();
 
-      OperatorStats operatorStats = new OperatorStats(requestId, stageId, operatorType);
+      OperatorStats operatorStats =
+          new OperatorStats(requestId, stageId, serverAddress, operatorType);
       operatorStats.recordExecutionStats(
           JsonUtils.jsonNodeToObject(operatorStatsNode.get("executionStats"), new TypeReference<Map<String, String>>() {
           }));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java
deleted file mode 100644
index 79b4c87ed0..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.operator.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.LongConsumer;
-import org.apache.pinot.common.datatable.DataTable;
-
-//TODO: Remove this and use BaseReduceService.ExecutionStatsAggregator
-public class StatsAggregator {
-  private long _numDocsScanned = 0L;
-  private long _numEntriesScannedInFilter = 0L;
-  private long _numEntriesScannedPostFilter = 0L;
-  private long _numSegmentsQueried = 0L;
-  private long _numSegmentsProcessed = 0L;
-  private long _numSegmentsMatched = 0L;
-  private long _numConsumingSegmentsQueried = 0L;
-  private long _numConsumingSegmentsProcessed = 0L;
-  private long _numConsumingSegmentsMatched = 0L;
-  private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
-  private long _numTotalDocs = 0L;
-  private long _numSegmentsPrunedByServer = 0L;
-  private long _numSegmentsPrunedInvalid = 0L;
-  private long _numSegmentsPrunedByLimit = 0L;
-  private long _numSegmentsPrunedByValue = 0L;
-  private long _explainPlanNumEmptyFilterSegments = 0L;
-  private long _explainPlanNumMatchAllFilterSegments = 0L;
-  private boolean _numGroupsLimitReached = false;
-
-  public synchronized void aggregate(Map<String, String> metadata) {
-    // Reduce on execution statistics.
-    String numDocsScannedString = metadata.get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName());
-    if (numDocsScannedString != null) {
-      _numDocsScanned += Long.parseLong(numDocsScannedString);
-    }
-    String numEntriesScannedInFilterString =
-        metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
-    if (numEntriesScannedInFilterString != null) {
-      _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
-    }
-    String numEntriesScannedPostFilterString =
-        metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
-    if (numEntriesScannedPostFilterString != null) {
-      _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
-    }
-    String numSegmentsQueriedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName());
-    if (numSegmentsQueriedString != null) {
-      _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
-    }
-
-    String numSegmentsProcessedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
-    if (numSegmentsProcessedString != null) {
-      _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
-    }
-    String numSegmentsMatchedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName());
-    if (numSegmentsMatchedString != null) {
-      _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
-    }
-
-    String numConsumingSegmentsQueriedString =
-        metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
-    if (numConsumingSegmentsQueriedString != null) {
-      _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
-    }
-
-    String numConsumingSegmentsProcessed =
-        metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
-    if (numConsumingSegmentsProcessed != null) {
-      _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
-    }
-
-    String numConsumingSegmentsMatched = metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
-    if (numConsumingSegmentsMatched != null) {
-      _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
-    }
-
-    String minConsumingFreshnessTimeMsString =
-        metadata.get(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
-    if (minConsumingFreshnessTimeMsString != null) {
-      _minConsumingFreshnessTimeMs =
-          Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
-    }
-
-    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
-        l -> _numSegmentsPrunedByServer += l);
-    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
-        l -> _numSegmentsPrunedInvalid += l);
-    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
-        l -> _numSegmentsPrunedByLimit += l);
-    withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
-        l -> _numSegmentsPrunedByValue += l);
-
-    String explainPlanNumEmptyFilterSegments =
-        metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
-    if (explainPlanNumEmptyFilterSegments != null) {
-      _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
-    }
-
-    String explainPlanNumMatchAllFilterSegments =
-        metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
-    if (explainPlanNumMatchAllFilterSegments != null) {
-      _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
-    }
-
-    String numTotalDocsString = metadata.get(DataTable.MetadataKey.TOTAL_DOCS.getName());
-    if (numTotalDocsString != null) {
-      _numTotalDocs += Long.parseLong(numTotalDocsString);
-    }
-    _numGroupsLimitReached |=
-        Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
-  }
-
-  public Map<String, String> getStats() {
-    Map<String, String> metadata = new HashMap<>();
-    metadata.put(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName(), String.valueOf(_numDocsScanned));
-    metadata.put(DataTable.MetadataKey.TOTAL_DOCS.getName(), String.valueOf(_numTotalDocs));
-    metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
-        String.valueOf(_numEntriesScannedInFilter));
-    metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
-        String.valueOf(_numEntriesScannedPostFilter));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName(), String.valueOf(_numSegmentsQueried));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(_numSegmentsProcessed));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(_numSegmentsMatched));
-    metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
-        String.valueOf(_numConsumingSegmentsQueried));
-    metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
-        String.valueOf(_numConsumingSegmentsProcessed));
-    metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
-        String.valueOf(_numConsumingSegmentsMatched));
-    metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
-        String.valueOf(_minConsumingFreshnessTimeMs));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
-        String.valueOf(_numSegmentsPrunedByServer));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
-        String.valueOf(_numSegmentsPrunedByLimit));
-    metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
-        String.valueOf(_numSegmentsPrunedInvalid));
-    metadata.put(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
-        String.valueOf(_explainPlanNumEmptyFilterSegments));
-    metadata.put(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
-        String.valueOf(_explainPlanNumMatchAllFilterSegments));
-    metadata.put(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), String.valueOf(_numGroupsLimitReached));
-
-    return metadata;
-  }
-
-  private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
-    String strValue = metadata.get(key.getName());
-    if (strValue != null) {
-      consumer.accept(Long.parseLong(strValue));
-    }
-  }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index fb2de69d89..ea9c2c38e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -85,8 +85,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
-    return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(),
-        node.getGroupSet(), node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId);
+    return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet(),
+        node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId, context.getServer());
   }
 
   @Override
@@ -99,7 +99,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition(), context.getRequestId(),
-        context.getStageId());
+        context.getStageId(), context.getServer());
   }
 
   @Override
@@ -111,21 +111,21 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
     MultiStageOperator rightOperator = right.visit(this, context);
 
     return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node, context.getRequestId(),
-        context.getStageId());
+        context.getStageId(), context.getServer());
   }
 
   @Override
   public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(),
-        node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId());
+        node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer());
   }
 
   @Override
   public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
-    return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(),
-        node.getFetch(), node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId());
+    return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), node.getFetch(),
+        node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer());
   }
 
   @Override
@@ -136,6 +136,6 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) {
     return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows(), context.getRequestId(),
-        context.getStageId());
+        context.getStageId(), context.getServer());
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index b0f959fcd7..9c7b73a9e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -36,6 +37,7 @@ import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -47,7 +49,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
-import org.apache.pinot.query.runtime.operator.utils.StatsAggregator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.roaringbitmap.RoaringBitmap;
@@ -66,15 +67,9 @@ public class QueryDispatcher {
   public QueryDispatcher() {
   }
 
-  public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
-      MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions)
-      throws Exception {
-    return submitAndReduce(requestId, queryPlan, mailboxService, timeoutMs, queryOptions, new HashMap<>());
-  }
-
   public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
       MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions,
-      Map<String, String> metadata)
+      ExecutionStatsAggregator executionStatsAggregator)
       throws Exception {
     // submit all the distributed stages.
     int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
@@ -84,7 +79,8 @@ public class QueryDispatcher {
         queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId,
         reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(),
         new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs);
-    List<DataBlock> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, metadata);
+    List<DataBlock> resultDataBlocks =
+        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator);
     return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
         queryPlan.getQueryStageMap().get(0).getDataSchema());
   }
@@ -132,29 +128,11 @@ public class QueryDispatcher {
   }
 
   public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs) {
-    List<DataBlock> resultDataBlocks = new ArrayList<>();
-    TransferableBlock transferableBlock;
-    long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
-    while (System.nanoTime() < timeoutWatermark) {
-      transferableBlock = mailboxReceiveOperator.nextBlock();
-      if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) {
-        // TODO: we only received bubble up error from the execution stage tree.
-        // TODO: query dispatch should also send cancel signal to the rest of the execution stage tree.
-        throw new RuntimeException(
-            "Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions());
-      }
-      if (transferableBlock.isNoOpBlock()) {
-        continue;
-      } else if (transferableBlock.isEndOfStreamBlock()) {
-        return resultDataBlocks;
-      }
-      resultDataBlocks.add(transferableBlock.getDataBlock());
-    }
-    throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
+    return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null);
   }
 
   public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
-      Map<String, String> metadata) {
+      @Nullable ExecutionStatsAggregator executionStatsAggregator) {
     List<DataBlock> resultDataBlocks = new ArrayList<>();
     TransferableBlock transferableBlock;
     long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -169,13 +147,13 @@ public class QueryDispatcher {
       if (transferableBlock.isNoOpBlock()) {
         continue;
       } else if (transferableBlock.isEndOfStreamBlock()) {
-        StatsAggregator statsAggregator = new StatsAggregator();
-        for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
-          LOGGER.info("Broker Query Execution Stats, OperatorId: {}, OperatorStats: {}", entry.getKey(),
-              OperatorUtils.operatorStatsToJson(entry.getValue()));
-          statsAggregator.aggregate(entry.getValue().getExecutionStats());
+        if (executionStatsAggregator != null) {
+          for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
+            LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(),
+                OperatorUtils.operatorStatsToJson(entry.getValue()));
+            executionStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+          }
         }
-        metadata.putAll(statsAggregator.getStats());
         return resultDataBlocks;
       }
       resultDataBlocks.add(transferableBlock.getDataBlock());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 6ab4bc3a3e..1ed74db296 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -163,14 +163,14 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
   @Test(dataProvider = "testDataWithSqlToFinalRowCount")
   public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
       throws Exception {
-    List<Object[]> resultRows = queryRunner(sql);
+    List<Object[]> resultRows = queryRunner(sql, null);
     Assert.assertEquals(resultRows.size(), expectedRows);
   }
 
   @Test(dataProvider = "testSql")
   public void testSqlWithH2Checker(String sql)
       throws Exception {
-    List<Object[]> resultRows = queryRunner(sql);
+    List<Object[]> resultRows = queryRunner(sql, null);
     // query H2 for data
     List<Object[]> expectedRows = queryH2(sql);
     compareRowEquals(resultRows, expectedRows);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 412b622c5d..6ebc8cdb1c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.stream.Collectors;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryServerEnclosure;
@@ -81,7 +82,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
   // --------------------------------------------------------------------------
   // QUERY UTILS
   // --------------------------------------------------------------------------
-  protected List<Object[]> queryRunner(String sql) {
+  protected List<Object[]> queryRunner(String sql, ExecutionStatsAggregator executionStatsAggregator) {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     Map<String, String> requestMetadataMap =
         ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()),
@@ -107,7 +108,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
     return QueryDispatcher.toResultTable(
-        QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS),
+        QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
+            executionStatsAggregator),
         queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0bda46470e..6cd0127b76 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -49,9 +50,13 @@ public class AggregateOperatorTest {
   @Mock
   private MultiStageOperator _input;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -71,7 +76,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build
@@ -91,7 +96,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -113,7 +118,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block
@@ -136,7 +141,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -161,7 +166,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -193,7 +198,8 @@ public class AggregateOperatorTest {
     Mockito.when(merger.initialize(Mockito.any(), Mockito.any())).thenReturn(1d);
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
     AggregateOperator operator =
-        new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2);
+        new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2,
+            _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -215,7 +221,7 @@ public class AggregateOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
     AggregateOperator sum0GroupBy1 =
         new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
-            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2);
+            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2, _serverAddress);
     TransferableBlock result = sum0GroupBy1.getNextBlock();
     while (result.isNoOpBlock()) {
       result = sum0GroupBy1.getNextBlock();
@@ -238,7 +244,7 @@ public class AggregateOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE});
 
     // When:
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
   }
 
   @Test
@@ -255,7 +261,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock block = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index cf5cb80151..26e40cc4ab 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -42,9 +43,13 @@ public class FilterOperatorTest {
   @Mock
   private MultiStageOperator _upstreamOperator;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -61,7 +66,7 @@ public class FilterOperatorTest {
     DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.BOOLEAN
     });
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock error = errorBlock.getDataBlock();
@@ -76,7 +81,7 @@ public class FilterOperatorTest {
         DataSchema.ColumnDataType.INT
     });
     Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertTrue(dataBlock.isEndOfStreamBlock());
   }
@@ -89,7 +94,7 @@ public class FilterOperatorTest {
         DataSchema.ColumnDataType.INT
     });
     Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertTrue(dataBlock.isNoOpBlock());
   }
@@ -104,7 +109,7 @@ public class FilterOperatorTest {
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -122,7 +127,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -137,7 +142,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock data = errorBlock.getDataBlock();
@@ -152,7 +157,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2, _serverAddress);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock data = errorBlock.getDataBlock();
@@ -167,7 +172,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -187,7 +192,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
         ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -207,7 +212,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
         ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -229,7 +234,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
         ImmutableList.of(new RexExpression.InputRef(0)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -248,7 +253,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall greaterThan =
         new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan",
             ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -268,7 +273,7 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith",
             ImmutableList.of(new RexExpression.InputRef(0),
                 new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -289,6 +294,6 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError",
             ImmutableList.of(new RexExpression.InputRef(0),
                 new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index b9a95ba9a0..c77107a58c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -51,9 +52,13 @@ public class HashJoinOperatorTest {
   @Mock
   private MultiStageOperator _rightOperator;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -90,7 +95,8 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator joinOnString =
+        new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = joinOnString.nextBlock();
     while (result.isNoOpBlock()) {
@@ -127,7 +133,8 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator joinOnInt =
+        new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -161,7 +168,8 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator joinOnInt =
+        new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -202,7 +210,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -236,7 +244,7 @@ public class HashJoinOperatorTest {
     List<RexExpression> joinClauses = new ArrayList<>();
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -267,7 +275,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -299,9 +307,10 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
+
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -339,7 +348,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -377,7 +386,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -411,7 +420,8 @@ public class HashJoinOperatorTest {
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.RIGHT,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator joinOnNum =
+        new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = joinOnNum.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnNum.nextBlock();
@@ -460,14 +470,14 @@ public class HashJoinOperatorTest {
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.SEMI,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
     }
     List<Object[]> resultRows = result.getContainer();
-    List<Object[]> expectedRows = ImmutableList.of(new Object[]{1, "Aa", null, null},
-        new Object[]{2, "BB", null, null});
+    List<Object[]> expectedRows =
+        ImmutableList.of(new Object[]{1, "Aa", null, null}, new Object[]{2, "BB", null, null});
     Assert.assertEquals(resultRows.size(), expectedRows.size());
     Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
     Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
@@ -499,7 +509,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.FULL,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -551,7 +561,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.ANTI,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -589,7 +599,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -622,7 +632,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -658,7 +668,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
 
     TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block.
     Assert.assertTrue(result.isNoOpBlock());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 185a6d5d53..a8332f22d2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -34,8 +34,14 @@ import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunct
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -43,6 +49,22 @@ import static org.mockito.Mockito.mock;
 
 // TODO: add tests for Agg / GroupBy / Distinct result blocks
 public class LeafStageTransferableBlockOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
 
   @Test
   public void shouldReturnDataBlockThenMetadataBlock() {
@@ -53,7 +75,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -79,7 +101,7 @@ public class LeafStageTransferableBlockOperatorTest {
         new SelectionResultsBlock(resultSchema,
             Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -101,7 +123,7 @@ public class LeafStageTransferableBlockOperatorTest {
         new SelectionResultsBlock(schema,
             Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -126,7 +148,7 @@ public class LeafStageTransferableBlockOperatorTest {
             queryContext),
         new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock1 = operator.nextBlock();
@@ -156,7 +178,7 @@ public class LeafStageTransferableBlockOperatorTest {
         errorBlock,
         new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -177,7 +199,7 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
             Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -198,7 +220,7 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
             Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -222,7 +244,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -245,7 +267,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -264,7 +286,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -286,7 +308,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -309,7 +331,7 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
             new DistinctTable(resultSchema, Collections.emptyList())), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -331,7 +353,7 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+        new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 856965cfb4..749431e090 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -23,10 +23,12 @@ import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -41,9 +43,13 @@ public class LiteralValueOperatorTest {
   @Mock
   private PlanRequestContext _context;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -58,14 +64,9 @@ public class LiteralValueOperatorTest {
     DataSchema schema = new DataSchema(new String[]{"sLiteral", "iLiteral"},
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
     List<List<RexExpression>> literals = ImmutableList.of(
-        ImmutableList.of(
-            new RexExpression.Literal(DataType.STRING, "foo"),
-            new RexExpression.Literal(DataType.INT, 1)),
-        ImmutableList.of(
-            new RexExpression.Literal(DataType.STRING, ""),
-            new RexExpression.Literal(DataType.INT, 2))
-    );
-    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2);
+        ImmutableList.of(new RexExpression.Literal(DataType.STRING, "foo"), new RexExpression.Literal(DataType.INT, 1)),
+        ImmutableList.of(new RexExpression.Literal(DataType.STRING, ""), new RexExpression.Literal(DataType.INT, 2)));
+    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock transferableBlock = operator.nextBlock();
@@ -81,7 +82,7 @@ public class LiteralValueOperatorTest {
     // Given:
     DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{});
     List<List<RexExpression>> literals = ImmutableList.of(ImmutableList.of());
-    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2);
+    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress);
 
     // When:
     TransferableBlock transferableBlock = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 71f23ede1d..b916b5cc5e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -39,6 +40,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+
 public class MailboxSendOperatorTest {
 
   private static final int DEFAULT_SENDER_STAGE_ID = 0;
@@ -64,6 +66,10 @@ public class MailboxSendOperatorTest {
     _mocks = MockitoAnnotations.openMocks(this);
     Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
         .thenReturn(_exchange);
+
+    Mockito.when(_server.getHostname()).thenReturn("mock");
+    Mockito.when(_server.getQueryMailboxPort()).thenReturn(0);
+    Mockito.when(_server.getVirtualId()).thenReturn(0);
   }
 
   @AfterMethod
@@ -75,12 +81,12 @@ public class MailboxSendOperatorTest {
   @Test
   public void shouldSwallowNoOpBlockFromUpstream() {
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(
-        _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
-            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_input.nextBlock())
-        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+        new VirtualServerAddress(_server));
+    Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -93,13 +99,13 @@ public class MailboxSendOperatorTest {
   @Test
   public void shouldSendErrorBlock() {
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(
-        _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
-            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+        new VirtualServerAddress(_server));
     TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
-    Mockito.when(_input.nextBlock())
-        .thenReturn(errorBlock);
+    Mockito.when(_input.nextBlock()).thenReturn(errorBlock);
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -112,12 +118,12 @@ public class MailboxSendOperatorTest {
   @Test
   public void shouldSendErrorBlockWhenInputThrows() {
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(
-        _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
-            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_input.nextBlock())
-        .thenThrow(new RuntimeException("foo!"));
+            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+        new VirtualServerAddress(_server));
+    Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
     ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
 
     // When:
@@ -132,13 +138,13 @@ public class MailboxSendOperatorTest {
   @Test
   public void shouldSendEosBlock() {
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(
-        _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
-            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+        new VirtualServerAddress(_server));
     TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock();
-    Mockito.when(_input.nextBlock())
-        .thenReturn(eosBlock);
+    Mockito.when(_input.nextBlock()).thenReturn(eosBlock);
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -151,13 +157,13 @@ public class MailboxSendOperatorTest {
   @Test
   public void shouldSendDataBlock() {
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(
-        _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
-            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+            DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+        new VirtualServerAddress(_server));
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{}));
-    Mockito.when(_input.nextBlock())
-        .thenReturn(dataBlock)
+    Mockito.when(_input.nextBlock()).thenReturn(dataBlock)
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
     // When:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index a7053c226c..04c3b8c700 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -47,9 +48,13 @@ public class SortOperatorTest {
   @Mock
   private MultiStageOperator _input;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -64,7 +69,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -82,7 +87,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -99,7 +104,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -116,10 +121,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -139,10 +143,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -162,10 +165,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -185,10 +187,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -208,10 +209,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -231,10 +231,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -253,10 +252,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 0, 0, schema, 1, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 0, 0, schema, 1, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -275,10 +273,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
@@ -296,10 +293,9 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
         .thenReturn(block(schema, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -320,7 +316,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -344,7 +340,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -368,12 +364,10 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(block(schema, new Object[]{2}))
-        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
-        .thenReturn(block(schema, new Object[]{1}))
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     // When:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index 52ffff08fb..f990882f74 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -47,9 +48,13 @@ public class TransformOperatorTest {
   @Mock
   private MultiStageOperator _upstreamOp;
 
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
   }
 
   @AfterMethod
@@ -71,7 +76,8 @@ public class TransformOperatorTest {
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
     RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2);
+        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2,
+            _serverAddress);
     TransferableBlock result = op.nextBlock();
 
     Assert.assertTrue(!result.isErrorBlock());
@@ -96,7 +102,7 @@ public class TransformOperatorTest {
     RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str");
     TransformOperator op =
         new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
-            2);
+            2, _serverAddress);
     TransferableBlock result = op.nextBlock();
     // Literal operands should just output original literals.
     Assert.assertTrue(!result.isErrorBlock());
@@ -126,7 +132,8 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
+        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2,
+            _serverAddress);
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(!result.isErrorBlock());
     List<Object[]> resultRows = result.getContainer();
@@ -154,7 +161,8 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
+        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2,
+            _serverAddress);
 
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(result.isErrorBlock());
@@ -175,7 +183,7 @@ public class TransformOperatorTest {
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
     TransformOperator op =
         new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
-            2);
+            2, _serverAddress);
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(result.isErrorBlock());
     DataBlock data = result.getDataBlock();
@@ -199,7 +207,7 @@ public class TransformOperatorTest {
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING});
     TransformOperator op =
         new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
-            2);
+            2, _serverAddress);
     TransferableBlock result = op.nextBlock();
     // First block has two rows
     Assert.assertFalse(result.isErrorBlock());
@@ -231,7 +239,7 @@ public class TransformOperatorTest {
         DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
     });
     TransformOperator transform =
-        new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2);
+        new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2, _serverAddress);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*doesn't match "
@@ -244,6 +252,6 @@ public class TransformOperatorTest {
     });
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
     TransformOperator transform =
-        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2);
+        new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2, _serverAddress);
   }
 };
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index b0a09f3f71..d311e9f467 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -33,11 +33,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
@@ -65,6 +68,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
   private static final Random RANDOM = new Random(42);
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
 
+  private static Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -158,10 +163,26 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
     Map<String, Object> reducerConfig = new HashMap<>();
     reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
     reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname);
-    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig),
-        ignored -> { });
+    _mailboxService =
+        new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig), ignored -> {
+        });
     _mailboxService.start();
 
+    Map<String, List<String>> tableToSegmentMap1 = factory1.buildTableSegmentNameMap();
+    Map<String, List<String>> tableToSegmentMap2 = factory2.buildTableSegmentNameMap();
+
+    for (Map.Entry<String, List<String>> entry : tableToSegmentMap1.entrySet()) {
+      _tableToSegmentMap.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    }
+
+    for (Map.Entry<String, List<String>> entry : tableToSegmentMap2.entrySet()) {
+      if (_tableToSegmentMap.containsKey(entry.getKey())) {
+        _tableToSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+      } else {
+        _tableToSegmentMap.put(entry.getKey(), new HashSet<>(entry.getValue()));
+      }
+    }
+
     _queryEnvironment =
         QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort(),
             factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap());
@@ -189,7 +210,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
   public void testQueryTestCasesWithH2(String testCaseName, String sql, String expect)
       throws Exception {
     // query pinot
-    runQuery(sql, expect).ifPresent(rows -> {
+    runQuery(sql, expect, null).ifPresent(rows -> {
       try {
         compareRowEquals(rows, queryH2(sql));
       } catch (Exception e) {
@@ -201,13 +222,25 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
   @Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
   public void testQueryTestCasesWithOutput(String testCaseName, String sql, List<Object[]> expectedRows, String expect)
       throws Exception {
-    runQuery(sql, expect).ifPresent(rows -> compareRowEquals(rows, expectedRows));
+    runQuery(sql, expect, null).ifPresent(rows -> compareRowEquals(rows, expectedRows));
+  }
+
+  @Test(dataProvider = "testResourceQueryTestCaseProviderWithMetadata")
+  public void testQueryTestCasesWithMetadata(String testCaseName, String sql, String expect, int numSegments)
+      throws Exception {
+    ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false);
+    runQuery(sql, expect, executionStatsAggregator).ifPresent(rows -> {
+      BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+      executionStatsAggregator.setStats(brokerResponseNative);
+      Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), numSegments);
+    });
   }
 
-  private Optional<List<Object[]>> runQuery(String sql, final String except) {
+  private Optional<List<Object[]>> runQuery(String sql, final String except,
+      ExecutionStatsAggregator executionStatsAggregator) {
     try {
       // query pinot
-      List<Object[]> resultRows = queryRunner(sql);
+      List<Object[]> resultRows = queryRunner(sql, executionStatsAggregator);
 
       Assert.assertNull(except,
           "Expected error with message '" + except + "'. But instead rows were returned: " + resultRows.stream()
@@ -252,6 +285,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
           for (List<Object> objs : orgRows) {
             expectedRows.add(objs.toArray());
           }
+
           Object[] testEntry = new Object[]{testCaseName, sql, expectedRows, queryCase._expectedException};
           providerContent.add(testEntry);
         }
@@ -260,6 +294,48 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
     return providerContent.toArray(new Object[][]{});
   }
 
+  @DataProvider
+  private static Object[][] testResourceQueryTestCaseProviderWithMetadata()
+      throws Exception {
+    Map<String, QueryTestCase> testCaseMap = getTestCases();
+    List<Object[]> providerContent = new ArrayList<>();
+    for (Map.Entry<String, QueryTestCase> testCaseEntry : testCaseMap.entrySet()) {
+      String testCaseName = testCaseEntry.getKey();
+      if (testCaseEntry.getValue()._ignored) {
+        continue;
+      }
+
+      List<QueryTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
+      for (QueryTestCase.Query queryCase : queryCases) {
+        if (queryCase._ignored) {
+          continue;
+        }
+
+        if (queryCase._outputs != null) {
+          String sql = replaceTableName(testCaseName, queryCase._sql);
+          if (!sql.contains("basic_test")) {
+            continue;
+          }
+
+          List<List<Object>> orgRows = queryCase._outputs;
+          List<Object[]> expectedRows = new ArrayList<>(orgRows.size());
+          for (List<Object> objs : orgRows) {
+            expectedRows.add(objs.toArray());
+          }
+          int segmentCount = 0;
+          for (String tableName : testCaseEntry.getValue()._tables.keySet()) {
+            segmentCount +=
+                _tableToSegmentMap.getOrDefault(testCaseName + "_" + tableName + "_OFFLINE", new HashSet<>()).size();
+          }
+
+          Object[] testEntry = new Object[]{testCaseName, sql, queryCase._expectedException, segmentCount};
+          providerContent.add(testEntry);
+        }
+      }
+    }
+    return providerContent.toArray(new Object[][]{});
+  }
+
   @DataProvider
   private static Object[][] testResourceQueryTestCaseProviderInputOnly()
       throws Exception {
@@ -319,8 +395,9 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       URL testFileUrl = classLoader.getResource(testCaseFile);
       // This test only supports local resource loading (e.g. must be a file), not support JAR test loading.
       if (testFileUrl != null && new File(testFileUrl.getFile()).exists()) {
-        Map<String, QueryTestCase> testCases = MAPPER.readValue(new File(testFileUrl.getFile()),
-            new TypeReference<Map<String, QueryTestCase>>() { });
+        Map<String, QueryTestCase> testCases =
+            MAPPER.readValue(new File(testFileUrl.getFile()), new TypeReference<Map<String, QueryTestCase>>() {
+            });
         {
           HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet());
           hashSet.retainAll(testCases.keySet());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org