You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/08 23:32:04 UTC

(pinot) branch master updated: [Multi-stage] Optimize mailbox info in query plan (#12382)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d41b3806e [Multi-stage] Optimize mailbox info in query plan (#12382)
2d41b3806e is described below

commit 2d41b3806e14b4d9822c6eddfa57acf613356db2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 8 15:31:58 2024 -0800

    [Multi-stage] Optimize mailbox info in query plan (#12382)
---
 pinot-common/src/main/proto/worker.proto           |  25 ++-
 .../explain/PhysicalExplainPlanVisitor.java        |  35 +--
 .../planner/physical/DispatchablePlanContext.java  |   8 +-
 .../planner/physical/DispatchablePlanMetadata.java |   8 +-
 .../planner/physical/MailboxAssignmentVisitor.java | 124 ++++++-----
 .../query/planner/physical/MailboxIdUtils.java     |  43 ++--
 .../{MailboxMetadata.java => MailboxInfo.java}     |  39 ++--
 .../apache/pinot/query/routing/MailboxInfos.java   |  35 ++-
 .../pinot/query/routing}/QueryPlanSerDeUtils.java  |  64 +++---
 .../apache/pinot/query/routing/RoutingInfo.java    |  38 ++--
 .../pinot/query/routing/SharedMailboxInfos.java    |  38 ++--
 .../apache/pinot/query/routing}/StageMetadata.java |  11 +-
 .../org/apache/pinot/query/routing}/StagePlan.java |  10 +-
 .../apache/pinot/query/routing/WorkerMetadata.java |  24 +--
 .../apache/pinot/query/QueryCompilationTest.java   | 239 ++++++++++-----------
 .../apache/pinot/query/runtime/QueryRunner.java    |  49 +++--
 .../operator/BaseMailboxReceiveOperator.java       |  10 +-
 .../runtime/operator/MailboxSendOperator.java      |  26 +--
 .../runtime/plan/OpChainExecutionContext.java      |  18 +-
 .../plan/pipeline/PipelineBreakerExecutor.java     |   6 +-
 .../plan/server/ServerPlanRequestContext.java      |   2 +-
 .../plan/server/ServerPlanRequestUtils.java        |   4 +-
 .../query/service/dispatch/QueryDispatcher.java    |  16 +-
 .../pinot/query/service/server/QueryServer.java    |  10 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   2 +-
 .../executor/OpChainSchedulerServiceTest.java      |  17 +-
 .../operator/MailboxReceiveOperatorTest.java       |  29 ++-
 .../runtime/operator/MailboxSendOperatorTest.java  |  21 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  58 ++---
 .../query/runtime/operator/OperatorTestUtil.java   |  17 +-
 .../operator/SortedMailboxReceiveOperatorTest.java |  29 ++-
 .../plan/pipeline/PipelineBreakerExecutorTest.java |  33 +--
 .../query/runtime/queries/QueryRunnerTestBase.java |  10 +-
 .../query/service/server/QueryServerTest.java      |  18 +-
 34 files changed, 543 insertions(+), 573 deletions(-)

diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index b7e492fcc5..8ad277c96d 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -49,25 +49,30 @@ message QueryResponse {
 }
 
 message StagePlan {
-  int32 stageId = 1;
-  bytes rootNode = 2; // Serialized StageNode
-  StageMetadata stageMetadata = 3;
+  bytes rootNode = 1; // Serialized StageNode
+  StageMetadata stageMetadata = 2;
 }
 
 message StageMetadata {
-  repeated WorkerMetadata workerMetadata = 1;
-  bytes customProperty = 2; // Serialized Properties
+  int32 stageId = 1;
+  repeated WorkerMetadata workerMetadata = 2;
+  bytes customProperty = 3; // Serialized Properties
 }
 
 message WorkerMetadata {
-  string virtualAddress = 1;
-  map<int32, MailboxMetadata> mailboxMetadata = 2;
+  int32 workedId = 1;
+  map<int32, bytes> mailboxInfos = 2; // Stage id to serialized MailboxInfos
   map<string, string> customProperty = 3;
 }
 
-message MailboxMetadata {
-  repeated string mailboxId = 1;
-  repeated string virtualAddress = 2;
+message MailboxInfos {
+  repeated MailboxInfo mailboxInfo = 1;
+}
+
+message MailboxInfo {
+  string hostname = 1;
+  int32 port = 2;
+  repeated int32 workerId = 3;
 }
 
 message Properties {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
index ea9bef1139..61dd1a23f0 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pinot.query.planner.explain;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
@@ -41,8 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode;
 import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.routing.MailboxInfo;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 
 
 /**
@@ -214,14 +212,13 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
     appendInfo(node, context);
 
     int receiverStageId = node.getReceiverStageId();
-    List<VirtualServerAddress> serverAddressList =
+    List<MailboxInfo> receiverMailboxInfos =
         _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
-            .get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
-    List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
+            .get(context._workerId).getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
     context._builder.append("->");
-    String receivers = serverInstanceToWorkerIdList.stream()
-        .map(s -> "[" + receiverStageId + "]@" + s)
-        .collect(Collectors.joining(",", "{", "}"));
+    // Sort to ensure print order
+    String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort))
+        .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}"));
     return context._builder.append(receivers);
   }
 
@@ -276,24 +273,4 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
       );
     }
   }
-
-  public static List<String> stringifyVirtualServerAddresses(List<VirtualServerAddress> serverAddressList) {
-    // using tree map to ensure print order.
-    Map<QueryServerInstance, List<Integer>> serverToWorkerIdMap = new TreeMap<>(
-        Comparator.comparing(QueryServerInstance::toString));
-    for (VirtualServerAddress serverAddress : serverAddressList) {
-      QueryServerInstance server = new QueryServerInstance(serverAddress.hostname(), serverAddress.port(), -1);
-      List<Integer> workerIds = serverToWorkerIdMap.getOrDefault(server, new ArrayList<>());
-      workerIds.add(serverAddress.workerId());
-      serverToWorkerIdMap.put(server, workerIds);
-    }
-    return serverToWorkerIdMap.entrySet().stream()
-        .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-        .collect(Collectors.toList());
-  }
-
-  public static String stringifyQueryServerInstanceToWorkerIdsEntry(Map.Entry<QueryServerInstance, List<Integer>> e) {
-    QueryServerInstance server = e.getKey();
-    return server.getHostname() + ":" + server.getQueryServicePort() + "|" + e.getValue();
-  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 22744dda0e..6d421e90d9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -29,9 +29,8 @@ import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
@@ -100,7 +99,7 @@ public class DispatchablePlanContext {
           dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
       Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
           dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
-      Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap =
+      Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap =
           dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
       Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = new HashMap<>();
       WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()];
@@ -108,8 +107,7 @@ public class DispatchablePlanContext {
         int workerId = serverEntry.getKey();
         QueryServerInstance queryServerInstance = serverEntry.getValue();
         serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
-        WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
-            workerIdToMailboxesMap.get(workerId));
+        WorkerMetadata workerMetadata = new WorkerMetadata(workerId, workerIdToMailboxesMap.get(workerId));
         if (workerIdToSegmentsMap != null) {
           workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
         }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 5415ca7019..db68010103 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
 import org.apache.pinot.query.routing.QueryServerInstance;
 
 
@@ -54,6 +54,8 @@ public class DispatchablePlanMetadata implements Serializable {
   // info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
   private boolean _requiresSingletonInstance;
 
+  // TODO: Change the following maps to lists
+
   // --------------------------------------------------------------------------
   // Fields extracted with {@link PinotDispatchPlanner}
   // --------------------------------------------------------------------------
@@ -67,7 +69,7 @@ public class DispatchablePlanMetadata implements Serializable {
 
   // used for build mailboxes between workers.
   // workerId -> {planFragmentId -> mailbox list}
-  private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+  private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap;
 
   // used for tracking unavailable segments from routing table, then assemble missing segments exception.
   private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
@@ -123,7 +125,7 @@ public class DispatchablePlanMetadata implements Serializable {
     _workerIdToSegmentsMap = workerIdToSegmentsMap;
   }
 
-  public Map<Integer, Map<Integer, MailboxMetadata>> getWorkerIdToMailboxesMap() {
+  public Map<Integer, Map<Integer, MailboxInfos>> getWorkerIdToMailboxesMap() {
     return _workerIdToMailboxesMap;
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 421e7bbc9c..7bdcdb1343 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -19,18 +19,20 @@
 package org.apache.pinot.query.planner.physical;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
 
 
 public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<Void, DispatchablePlanContext> {
@@ -47,8 +49,8 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
       DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverFragmentId);
       Map<Integer, QueryServerInstance> senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap();
       Map<Integer, QueryServerInstance> receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap();
-      Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap();
-      Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap();
+      Map<Integer, Map<Integer, MailboxInfos>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap();
+      Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap();
 
       int numSenders = senderServerMap.size();
       int numReceivers = receiverServerMap.size();
@@ -63,11 +65,11 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
           Preconditions.checkState(senderServer.equals(receiverServer),
               "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s",
               workerId, senderServer, receiverServer);
-          MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
-              MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
-              Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
-          senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
-          receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
+          MailboxInfos mailboxInfos = new SharedMailboxInfos(
+              new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(),
+                  ImmutableList.of(workerId)));
+          senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxInfos);
+          receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxInfos);
         }
       } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) {
         // - direct exchange possible:
@@ -78,82 +80,77 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
         if (partitionParallelism == 1) {
           // 1-to-1 mapping
           for (int workerId = 0; workerId < numSenders; workerId++) {
-            String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
-            MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
-                Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
-            MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
-                Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
+            QueryServerInstance senderServer = senderServerMap.get(workerId);
+            QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+            List<Integer> workerIds = ImmutableList.of(workerId);
+            MailboxInfos senderMailboxInfos;
+            MailboxInfos receiverMailboxInfos;
+            if (senderServer.equals(receiverServer)) {
+              senderMailboxInfos = new SharedMailboxInfos(
+                  new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
+              receiverMailboxInfos = senderMailboxInfos;
+            } else {
+              senderMailboxInfos = new MailboxInfos(
+                  new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
+              receiverMailboxInfos = new MailboxInfos(
+                  new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds));
+            }
             senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
-                .put(receiverFragmentId, serderMailboxMetadata);
+                .put(receiverFragmentId, receiverMailboxInfos);
             receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
-                .put(senderFragmentId, receiverMailboxMetadata);
+                .put(senderFragmentId, senderMailboxInfos);
           }
         } else {
           // 1-to-<partition_parallelism> mapping
           int receiverWorkerId = 0;
           for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
-            VirtualServerAddress senderAddress =
-                new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
-            List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
-            List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
-            MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
-            senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
-                .put(receiverFragmentId, senderMailboxMetadata);
+            QueryServerInstance senderServer = senderServerMap.get(senderWorkerId);
+            QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId);
+            List<Integer> receiverWorkerIds = new ArrayList<>(partitionParallelism);
+            senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverFragmentId,
+                new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(),
+                    receiverWorkerIds)));
+            MailboxInfos senderMailboxInfos = new SharedMailboxInfos(
+                new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(),
+                    ImmutableList.of(senderWorkerId)));
             for (int i = 0; i < partitionParallelism; i++) {
-              String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
-                  receiverWorkerId);
-              receivingMailboxIds.add(mailboxId);
-              receivingAddresses.add(
-                  new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
-
-              MailboxMetadata receiverMailboxMetadata =
-                  receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
-                      .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
-              receiverMailboxMetadata.getMailboxIds().add(mailboxId);
-              receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
-
+              receiverWorkerIds.add(receiverWorkerId);
+              receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
+                  .put(senderFragmentId, senderMailboxInfos);
               receiverWorkerId++;
             }
           }
         }
       } else {
         // For other exchange types, send the data to all the instances in the receiver fragment
-        // NOTE: Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with
-        //       the workerId. It is useful for JOIN query when only left table is partitioned.
         // TODO: Add support for more exchange types
+        List<MailboxInfo> receiverMailboxInfoList = getMailboxInfos(receiverServerMap);
+        MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList)
+            : new MailboxInfos(receiverMailboxInfoList);
         for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
-          VirtualServerAddress senderAddress =
-              new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
-          List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
-          List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
-          MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
           senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
-              .put(receiverFragmentId, senderMailboxMetadata);
-          for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
-            String mailboxId =
-                MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
-            receivingMailboxIds.add(mailboxId);
-            receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
-
-            MailboxMetadata receiverMailboxMetadata =
-                receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
-                    .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
-            receiverMailboxMetadata.getMailboxIds().add(mailboxId);
-            receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
-          }
+              .put(receiverFragmentId, receiverMailboxInfos);
+        }
+        List<MailboxInfo> senderMailboxInfoList = getMailboxInfos(senderServerMap);
+        MailboxInfos senderMailboxInfos =
+            numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) : new MailboxInfos(senderMailboxInfoList);
+        for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
+          receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
+              .put(senderFragmentId, senderMailboxInfos);
         }
       }
     }
     return null;
   }
 
-  private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) {
+  private static boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender,
+      DispatchablePlanMetadata receiver) {
     Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap();
     Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap();
 
     int numSenders = senderServerMap.size();
     int numReceivers = receiverServerMap.size();
-    if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
+    if (!sender.getScannedTables().isEmpty() && receiver.getScannedTables().isEmpty()) {
       // leaf-to-intermediate condition
       return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
           && sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
@@ -163,4 +160,15 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
           .equalsIgnoreCase(receiver.getPartitionFunction());
     }
   }
+
+  private static List<MailboxInfo> getMailboxInfos(Map<Integer, QueryServerInstance> workerIdToServerMap) {
+    Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = new HashMap<>();
+    int numServers = workerIdToServerMap.size();
+    for (int workerId = 0; workerId < numServers; workerId++) {
+      serverToWorkerIdsMap.computeIfAbsent(workerIdToServerMap.get(workerId), k -> new ArrayList<>()).add(workerId);
+    }
+    return serverToWorkerIdsMap.entrySet().stream()
+        .map(e -> new MailboxInfo(e.getKey().getHostname(), e.getKey().getQueryMailboxPort(), e.getValue()))
+        .collect(Collectors.toList());
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
index 32c7d3197a..f81e243dc7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
@@ -19,9 +19,10 @@
 package org.apache.pinot.query.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
 
 
 public class MailboxIdUtils {
@@ -30,23 +31,35 @@ public class MailboxIdUtils {
 
   public static final char SEPARATOR = '|';
 
-  public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
+  @VisibleForTesting
+  public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
       int receiverWorkerId) {
-    return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
-        + receiverWorkerId;
-  }
-
-  public static String toMailboxId(long requestId, String planMailboxId) {
-    return Long.toString(requestId) + SEPARATOR + planMailboxId;
+    return Long.toString(requestId) + SEPARATOR + senderStageId + SEPARATOR + senderWorkerId + SEPARATOR
+        + receiverStageId + SEPARATOR + receiverWorkerId;
   }
 
-  public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
-    return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
+  public static List<RoutingInfo> toRoutingInfos(long requestId, int senderStageId, int senderWorkerId,
+      int receiverStageId, List<MailboxInfo> receiverMailboxInfos) {
+    List<RoutingInfo> routingInfos = new ArrayList<>();
+    for (MailboxInfo mailboxInfo : receiverMailboxInfos) {
+      String hostname = mailboxInfo.getHostname();
+      int port = mailboxInfo.getPort();
+      for (int receiverWorkerId : mailboxInfo.getWorkerIds()) {
+        routingInfos.add(new RoutingInfo(hostname, port,
+            toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId)));
+      }
+    }
+    return routingInfos;
   }
 
-  @VisibleForTesting
-  public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
-      int receiverWorkerId) {
-    return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
+  public static List<String> toMailboxIds(long requestId, int senderStageId, List<MailboxInfo> senderMailboxInfos,
+      int receiverStageId, int receiverWorkerId) {
+    List<String> mailboxIds = new ArrayList<>();
+    for (MailboxInfo mailboxInfo : senderMailboxInfos) {
+      for (int senderWorkerId : mailboxInfo.getWorkerIds()) {
+        mailboxIds.add(toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
+      }
+    }
+    return mailboxIds;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
similarity index 50%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
index b3484d1a7b..b7ad8c5f61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
@@ -18,42 +18,37 @@
  */
 package org.apache.pinot.query.routing;
 
-import java.util.ArrayList;
 import java.util.List;
 
 
 /**
- * {@code MailboxMetadata} wraps around a list of mailboxes information from/to one connected stage.
- *  It contains the following information:
- *  <ul>
- *    <li>MailboxId: the unique id of the mailbox</li>
- *    <li>VirtualAddress: the virtual address of the mailbox</li>
- *  </ul>
+ * {@code MailboxInfo} wraps the mailbox information from/to one connected server.
  */
-public class MailboxMetadata {
-  private final List<String> _mailboxIds;
-  private final List<VirtualServerAddress> _virtualAddresses;
-
-  public MailboxMetadata() {
-    _mailboxIds = new ArrayList<>();
-    _virtualAddresses = new ArrayList<>();
+public class MailboxInfo {
+  private final String _hostname;
+  private final int _port;
+  private final List<Integer> _workerIds;
+
+  public MailboxInfo(String hostname, int port, List<Integer> workerIds) {
+    _hostname = hostname;
+    _port = port;
+    _workerIds = workerIds;
   }
 
-  public MailboxMetadata(List<String> mailboxIds, List<VirtualServerAddress> virtualAddresses) {
-    _mailboxIds = mailboxIds;
-    _virtualAddresses = virtualAddresses;
+  public String getHostname() {
+    return _hostname;
   }
 
-  public List<String> getMailboxIds() {
-    return _mailboxIds;
+  public int getPort() {
+    return _port;
   }
 
-  public List<VirtualServerAddress> getVirtualAddresses() {
-    return _virtualAddresses;
+  public List<Integer> getWorkerIds() {
+    return _workerIds;
   }
 
   @Override
   public String toString() {
-    return _mailboxIds + "@" + _virtualAddresses;
+    return _hostname + ":" + _port + "|" + _workerIds;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
index a45b48c5de..76c6e94224 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
@@ -16,36 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
 
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
 
 
-/**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
- */
-public class StagePlan {
-  private final int _stageId;
-  private final PlanNode _rootNode;
-  private final StageMetadata _stageMetadata;
+public class MailboxInfos {
+  private final List<MailboxInfo> _mailboxInfos;
 
-  public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
-    _stageId = stageId;
-    _rootNode = rootNode;
-    _stageMetadata = stageMetadata;
+  public MailboxInfos(List<MailboxInfo> mailboxInfos) {
+    _mailboxInfos = mailboxInfos;
   }
 
-  public int getStageId() {
-    return _stageId;
+  public MailboxInfos(MailboxInfo mailboxInfo) {
+    _mailboxInfos = ImmutableList.of(mailboxInfo);
   }
 
-  public PlanNode getRootNode() {
-    return _rootNode;
+  public List<MailboxInfo> getMailboxInfos() {
+    return _mailboxInfos;
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  public ByteString toProtoBytes() {
+    return QueryPlanSerDeUtils.toProtoMailboxInfos(_mailboxInfos).toByteString();
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
similarity index 53%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
index fbfb9487b4..d692d634b4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
@@ -16,10 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan.serde;
+package org.apache.pinot.query.routing;
 
+import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -27,11 +29,6 @@ import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 
 
 /**
@@ -46,30 +43,35 @@ public class QueryPlanSerDeUtils {
     AbstractPlanNode rootNode =
         StageNodeSerDeUtils.deserializeStageNode(Plan.StageNode.parseFrom(protoStagePlan.getRootNode()));
     StageMetadata stageMetadata = fromProtoStageMetadata(protoStagePlan.getStageMetadata());
-    return new StagePlan(protoStagePlan.getStageId(), rootNode, stageMetadata);
+    return new StagePlan(rootNode, stageMetadata);
   }
 
   private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata)
       throws InvalidProtocolBufferException {
-    List<WorkerMetadata> workerMetadataList =
-        protoStageMetadata.getWorkerMetadataList().stream().map(QueryPlanSerDeUtils::fromProtoWorkerMetadata)
-            .collect(Collectors.toList());
+    List<Worker.WorkerMetadata> protoWorkerMetadataList = protoStageMetadata.getWorkerMetadataList();
+    List<WorkerMetadata> workerMetadataList = new ArrayList<>(protoWorkerMetadataList.size());
+    for (Worker.WorkerMetadata protoWorkerMetadata : protoWorkerMetadataList) {
+      workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
+    }
     Map<String, String> customProperties = fromProtoProperties(protoStageMetadata.getCustomProperty());
-    return new StageMetadata(workerMetadataList, customProperties);
+    return new StageMetadata(protoStageMetadata.getStageId(), workerMetadataList, customProperties);
   }
 
-  private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
-    VirtualServerAddress virtualAddress = VirtualServerAddress.parse(protoWorkerMetadata.getVirtualAddress());
-    Map<Integer, MailboxMetadata> mailboxMetadataMap = protoWorkerMetadata.getMailboxMetadataMap().entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> fromProtoMailbox(e.getValue())));
-    return new WorkerMetadata(virtualAddress, mailboxMetadataMap, protoWorkerMetadata.getCustomPropertyMap());
+  private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata)
+      throws InvalidProtocolBufferException {
+    Map<Integer, ByteString> protoMailboxInfosMap = protoWorkerMetadata.getMailboxInfosMap();
+    Map<Integer, MailboxInfos> mailboxInfosMap = Maps.newHashMapWithExpectedSize(protoMailboxInfosMap.size());
+    for (Map.Entry<Integer, ByteString> entry : protoMailboxInfosMap.entrySet()) {
+      mailboxInfosMap.put(entry.getKey(), fromProtoMailboxInfos(entry.getValue()));
+    }
+    return new WorkerMetadata(protoWorkerMetadata.getWorkedId(), mailboxInfosMap,
+        protoWorkerMetadata.getCustomPropertyMap());
   }
 
-  private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) {
-    List<VirtualServerAddress> virtualAddresses =
-        protoMailboxMetadata.getVirtualAddressList().stream().map(VirtualServerAddress::parse)
-            .collect(Collectors.toList());
-    return new MailboxMetadata(protoMailboxMetadata.getMailboxIdList(), virtualAddresses);
+  private static MailboxInfos fromProtoMailboxInfos(ByteString protoMailboxInfos)
+      throws InvalidProtocolBufferException {
+    return new MailboxInfos(Worker.MailboxInfos.parseFrom(protoMailboxInfos).getMailboxInfoList().stream()
+        .map(v -> new MailboxInfo(v.getHostname(), v.getPort(), v.getWorkerIdList())).collect(Collectors.toList()));
   }
 
   public static Map<String, String> fromProtoProperties(ByteString protoProperties)
@@ -82,19 +84,17 @@ public class QueryPlanSerDeUtils {
   }
 
   private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
-    Map<Integer, Worker.MailboxMetadata> protoMailboxMetadataMap =
-        workerMetadata.getMailboxMetadataMap().entrySet().stream()
-            .collect(Collectors.toMap(Map.Entry::getKey, e -> toProtoMailboxMetadata(e.getValue())));
-    return Worker.WorkerMetadata.newBuilder().setVirtualAddress(workerMetadata.getVirtualAddress().toString())
-        .putAllMailboxMetadata(protoMailboxMetadataMap).putAllCustomProperty(workerMetadata.getCustomProperties())
-        .build();
+    Map<Integer, ByteString> mailboxInfosMap = workerMetadata.getMailboxInfosMap().entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toProtoBytes()));
+    return Worker.WorkerMetadata.newBuilder().setWorkedId(workerMetadata.getWorkerId())
+        .putAllMailboxInfos(mailboxInfosMap).putAllCustomProperty(workerMetadata.getCustomProperties()).build();
   }
 
-  private static Worker.MailboxMetadata toProtoMailboxMetadata(MailboxMetadata mailboxMetadata) {
-    List<String> virtualAddresses =
-        mailboxMetadata.getVirtualAddresses().stream().map(VirtualServerAddress::toString).collect(Collectors.toList());
-    return Worker.MailboxMetadata.newBuilder().addAllMailboxId(mailboxMetadata.getMailboxIds())
-        .addAllVirtualAddress(virtualAddresses).build();
+  public static Worker.MailboxInfos toProtoMailboxInfos(List<MailboxInfo> mailboxInfos) {
+    List<Worker.MailboxInfo> protoMailboxInfos = mailboxInfos.stream().map(
+        v -> Worker.MailboxInfo.newBuilder().setHostname(v.getHostname()).setPort(v.getPort())
+            .addAllWorkerId(v.getWorkerIds()).build()).collect(Collectors.toList());
+    return Worker.MailboxInfos.newBuilder().addAllMailboxInfo(protoMailboxInfos).build();
   }
 
   public static ByteString toProtoProperties(Map<String, String> properties) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
index a45b48c5de..7b92899237 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
@@ -16,36 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
 
-import org.apache.pinot.query.planner.plannode.PlanNode;
+public class RoutingInfo {
+  private final String _hostname;
+  private final int _port;
+  private final String _mailboxId;
 
-
-/**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
- */
-public class StagePlan {
-  private final int _stageId;
-  private final PlanNode _rootNode;
-  private final StageMetadata _stageMetadata;
-
-  public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
-    _stageId = stageId;
-    _rootNode = rootNode;
-    _stageMetadata = stageMetadata;
+  public RoutingInfo(String hostname, int port, String mailboxId) {
+    _hostname = hostname;
+    _port = port;
+    _mailboxId = mailboxId;
   }
 
-  public int getStageId() {
-    return _stageId;
+  public String getHostname() {
+    return _hostname;
   }
 
-  public PlanNode getRootNode() {
-    return _rootNode;
+  public int getPort() {
+    return _port;
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  public String getMailboxId() {
+    return _mailboxId;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
index a45b48c5de..15fb1e666a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
@@ -16,36 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
 
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import com.google.protobuf.ByteString;
+import java.util.List;
 
 
 /**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
+ * {@code SharedMailboxInfos} is the shared version of the {@link MailboxInfos} which can cache the proto bytes and
+ * reduce overhead of serialization.
  */
-public class StagePlan {
-  private final int _stageId;
-  private final PlanNode _rootNode;
-  private final StageMetadata _stageMetadata;
-
-  public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
-    _stageId = stageId;
-    _rootNode = rootNode;
-    _stageMetadata = stageMetadata;
-  }
+public class SharedMailboxInfos extends MailboxInfos {
+  private ByteString _protoBytes;
 
-  public int getStageId() {
-    return _stageId;
+  public SharedMailboxInfos(List<MailboxInfo> mailboxInfos) {
+    super(mailboxInfos);
   }
 
-  public PlanNode getRootNode() {
-    return _rootNode;
+  public SharedMailboxInfos(MailboxInfo mailboxInfo) {
+    super(mailboxInfo);
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  @Override
+  public synchronized ByteString toProtoBytes() {
+    if (_protoBytes == null) {
+      _protoBytes = super.toProtoBytes();
+    }
+    return _protoBytes;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
similarity index 88%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
index a07a04a0b7..6eca815d68 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
 
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
-import org.apache.pinot.query.routing.WorkerMetadata;
 
 
 /**
  * {@code StageMetadata} is used to send plan fragment-level info about how to execute a stage physically.
  */
 public class StageMetadata {
+  private final int _stageId;
   private final List<WorkerMetadata> _workerMetadataList;
   private final Map<String, String> _customProperties;
 
-  public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+  public StageMetadata(int stageId, List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+    _stageId = stageId;
     _workerMetadataList = workerMetadataList;
     _customProperties = customProperties;
   }
 
+  public int getStageId() {
+    return _stageId;
+  }
+
   public List<WorkerMetadata> getWorkerMetadataList() {
     return _workerMetadataList;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
similarity index 85%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
index a45b48c5de..7dfa00268d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
 
 import org.apache.pinot.query.planner.plannode.PlanNode;
 
@@ -27,20 +27,14 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
  * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
  */
 public class StagePlan {
-  private final int _stageId;
   private final PlanNode _rootNode;
   private final StageMetadata _stageMetadata;
 
-  public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
-    _stageId = stageId;
+  public StagePlan(PlanNode rootNode, StageMetadata stageMetadata) {
     _rootNode = rootNode;
     _stageMetadata = stageMetadata;
   }
 
-  public int getStageId() {
-    return _stageId;
-  }
-
   public PlanNode getRootNode() {
     return _rootNode;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 3392261980..ab1226862e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -44,29 +44,29 @@ import org.apache.pinot.spi.utils.JsonUtils;
 public class WorkerMetadata {
   public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
 
-  private final VirtualServerAddress _virtualAddress;
-  private final Map<Integer, MailboxMetadata> _mailboxMetadataMap;
+  private final int _workerId;
+  private final Map<Integer, MailboxInfos> _mailboxInfosMap;
   private final Map<String, String> _customProperties;
 
-  public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap) {
-    _virtualAddress = virtualAddress;
-    _mailboxMetadataMap = mailboxMetadataMap;
+  public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap) {
+    _workerId = workerId;
+    _mailboxInfosMap = mailboxInfosMap;
     _customProperties = new HashMap<>();
   }
 
-  public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap,
+  public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap,
       Map<String, String> customProperties) {
-    _virtualAddress = virtualAddress;
-    _mailboxMetadataMap = mailboxMetadataMap;
+    _workerId = workerId;
+    _mailboxInfosMap = mailboxInfosMap;
     _customProperties = customProperties;
   }
 
-  public VirtualServerAddress getVirtualAddress() {
-    return _virtualAddress;
+  public int getWorkerId() {
+    return _workerId;
   }
 
-  public Map<Integer, MailboxMetadata> getMailboxMetadataMap() {
-    return _mailboxMetadataMap;
+  public Map<Integer, MailboxInfos> getMailboxInfosMap() {
+    return _mailboxInfosMap;
   }
 
   public Map<String, String> getCustomProperties() {
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 20d798c9c6..4bf911c805 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.PlannerUtils;
-import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
@@ -40,53 +38,43 @@ import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.testng.Assert;
+import org.apache.pinot.query.routing.QueryServerInstance;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
+
 
 public class QueryCompilationTest extends QueryEnvironmentTestBase {
 
   @Test(dataProvider = "testQueryLogicalPlanDataProvider")
-  public void testQueryPlanExplainLogical(String query, String digest)
-      throws Exception {
+  public void testQueryPlanExplainLogical(String query, String digest) {
     testQueryPlanExplain(query, digest);
   }
 
   private void testQueryPlanExplain(String query, String digest) {
-    try {
-      long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
-      String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
-      Assert.assertEquals(explainedPlan, digest);
-    } catch (RuntimeException e) {
-      Assert.fail("failed to explain query: " + query, e);
-    }
+    long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+    String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
+    assertEquals(explainedPlan, digest);
   }
 
   @Test(dataProvider = "testQueryDataProvider")
-  public void testQueryPlanWithoutException(String query)
-      throws Exception {
-    try {
-      DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
-      Assert.assertNotNull(dispatchableSubPlan);
-    } catch (RuntimeException e) {
-      Assert.fail("failed to plan query: " + query, e);
-    }
+  public void testQueryPlanWithoutException(String query) {
+    DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
+    assertNotNull(dispatchableSubPlan);
   }
 
   @Test(dataProvider = "testQueryExceptionDataProvider")
   public void testQueryWithException(String query, String exceptionSnippet) {
     try {
       _queryEnvironment.planQuery(query);
-      Assert.fail("query plan should throw exception");
+      fail("query plan should throw exception");
     } catch (RuntimeException e) {
-      Assert.assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
+      assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
     }
   }
 
-  private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite)
-      throws Exception {
-
+  private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite) {
     for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
       if (dispatchableSubPlan.getTableNames().size() == 0 && !PlannerUtils.isRootPlanFragment(stageId)) {
         PlanNode node = dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot();
@@ -95,17 +83,17 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
             // JOIN is exchanged with hash distribution (data shuffle)
             MailboxReceiveNode left = (MailboxReceiveNode) node.getInputs().get(0);
             MailboxReceiveNode right = (MailboxReceiveNode) node.getInputs().get(1);
-            Assert.assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
-            Assert.assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+            assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+            assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
             break;
           }
           if (node instanceof AggregateNode && node.getInputs().get(0) instanceof MailboxReceiveNode) {
             // AGG is exchanged with singleton since it has already been distributed by JOIN.
             MailboxReceiveNode input = (MailboxReceiveNode) node.getInputs().get(0);
             if (shouldRewrite) {
-              Assert.assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
+              assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
             } else {
-              Assert.assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
+              assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
             }
             break;
           }
@@ -116,34 +104,40 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
   }
 
   @Test
-  public void testQueryAndAssertStageContentForJoin()
-      throws Exception {
+  public void testQueryAndAssertStageContentForJoin() {
     String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2";
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
-    Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 4);
-
-    for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
-      DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
-      String tableName = dispatchablePlanFragment.getTableName();
+    List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+    int numStages = stagePlans.size();
+    assertEquals(numStages, 4);
+    for (int stageId = 0; stageId < numStages; stageId++) {
+      DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+      Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap();
+      int numServers = serverToWorkerIdsMap.size();
+      String tableName = stagePlan.getTableName();
       if (tableName != null) {
         // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
-        Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-                .collect(Collectors.toSet()),
-            tableName.equals("a") ? ImmutableList.of("localhost:1|[1]", "localhost:2|[0]")
-                : ImmutableList.of("localhost:1|[0]"));
+        if (tableName.equals("a")) {
+          assertEquals(numServers, 2);
+          for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+            int port = server.getQueryMailboxPort();
+            assertTrue(port == 1 || port == 2);
+          }
+        } else {
+          assertEquals(numServers, 1);
+          assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 1);
+        }
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
-        Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-                .collect(Collectors.toSet()),
-            ImmutableSet.of("localhost:1|[1]", "localhost:2|[0]"));
+        assertEquals(numServers, 2);
+        for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+          int port = server.getQueryMailboxPort();
+          assertTrue(port == 1 || port == 2);
+        }
       } else {
         // reduce stage should have the reducer instance.
-        Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-                .collect(Collectors.toSet()),
-            ImmutableSet.of("localhost:3|[0]"));
+        assertEquals(numServers, 1);
+        assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 3);
       }
     }
   }
@@ -167,24 +161,27 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
   public void testQueryRoutingManagerCompilation() {
     String query = "SELECT * FROM d_OFFLINE";
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
-    List<DispatchablePlanFragment> tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
-        .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
-    Assert.assertEquals(tableScanMetadataList.size(), 1);
-    Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
+    List<DispatchablePlanFragment> tableScanMetadataList =
+        dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+            .collect(Collectors.toList());
+    assertEquals(tableScanMetadataList.size(), 1);
+    assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
 
     query = "SELECT * FROM d_REALTIME";
     dispatchableSubPlan = _queryEnvironment.planQuery(query);
-    tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
-        .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
-    Assert.assertEquals(tableScanMetadataList.size(), 1);
-    Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
+    tableScanMetadataList =
+        dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+            .collect(Collectors.toList());
+    assertEquals(tableScanMetadataList.size(), 1);
+    assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
 
     query = "SELECT * FROM d";
     dispatchableSubPlan = _queryEnvironment.planQuery(query);
-    tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
-        .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
-    Assert.assertEquals(tableScanMetadataList.size(), 1);
-    Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
+    tableScanMetadataList =
+        dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+            .collect(Collectors.toList());
+    assertEquals(tableScanMetadataList.size(), 1);
+    assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
   }
 
   // Test that plan query can be run as multi-thread.
@@ -232,34 +229,36 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     }
     for (ArrayList<DispatchableSubPlan> plans : queryPlans.values()) {
       for (DispatchableSubPlan plan : plans) {
-        Assert.assertTrue(plan.equals(plans.get(0)));
+        assertTrue(plan.equals(plans.get(0)));
       }
     }
   }
 
   @Test
-  public void testQueryWithHint()
-      throws Exception {
+  public void testQueryWithHint() {
     // Hinting the query to use final stage aggregation makes server directly return final result
     // This is useful when data is already partitioned by col1
     String query = "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ col1, COUNT(*) FROM b GROUP BY col1";
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
-    Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 2);
-    for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
-      DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
-      String tableName = dispatchablePlanFragment.getTableName();
+    List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+    int numStages = stagePlans.size();
+    assertEquals(numStages, 2);
+    for (int stageId = 0; stageId < numStages; stageId++) {
+      DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+      Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap();
+      int numServers = serverToWorkerIdsMap.size();
+      String tableName = stagePlan.getTableName();
       if (tableName != null) {
         // table scan stages; for tableB it should have only 1
-        Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-                .collect(Collectors.toSet()),
-            ImmutableList.of("localhost:1|[0]"));
+        assertEquals(numServers, 1);
+        assertEquals(stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next().getQueryMailboxPort(), 1);
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
-        Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
-                .collect(Collectors.toSet()),
-            ImmutableList.of("localhost:1|[1]", "localhost:2|[0]"));
+        assertEquals(numServers, 2);
+        for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+          int port = server.getQueryMailboxPort();
+          assertTrue(port == 1 || port == 2);
+        }
       }
     }
   }
@@ -269,105 +268,103 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     // A simple filter query with one table
     String query = "Select * from a where col1 = 'a'";
     List<String> tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 1);
-    Assert.assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.size(), 1);
+    assertEquals(tableNames.get(0), "a");
 
     // query with IN / NOT IN clause
-    query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) "
-        + "and col1 NOT IN (SELECT col1 from c)";
+    query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) " + "and col1 NOT IN (SELECT col1 from c)";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 3);
+    assertEquals(tableNames.size(), 3);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
-    Assert.assertEquals(tableNames.get(2), "c");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(2), "c");
 
     // query with JOIN clause
     query = "SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3 WHERE a.col1 = 'a'";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.size(), 2);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // query with WHERE clause JOIN
     query = "SELECT a.col1, b.col2 FROM a, b WHERE a.col3 = b.col3 AND a.col1 = 'a'";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.size(), 2);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // query with JOIN clause and table alias
     query = "SELECT A.col1, B.col2 FROM a AS A JOIN b AS B ON A.col3 = B.col3 WHERE A.col1 = 'a'";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.size(), 2);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // query with UNION clause
     query = "SELECT * FROM a UNION ALL SELECT * FROM b UNION ALL SELECT * FROM c";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 3);
+    assertEquals(tableNames.size(), 3);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
-    Assert.assertEquals(tableNames.get(2), "c");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(2), "c");
 
     // query with UNION clause and table alias
     query = "SELECT * FROM (SELECT * FROM a) AS t1 UNION SELECT * FROM ( SELECT * FROM b) AS t2";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.size(), 2);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // query with UNION clause and table alias using WITH clause
-    query = "WITH tmp1 AS (SELECT * FROM a), \n"
-        + "tmp2 AS (SELECT * FROM b) \n"
+    query = "WITH tmp1 AS (SELECT * FROM a), \n" + "tmp2 AS (SELECT * FROM b) \n"
         + "SELECT * FROM tmp1 UNION ALL SELECT * FROM tmp2";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.size(), 2);
     Collections.sort(tableNames);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // query with aliases, JOIN, IN/NOT-IN, group-by
     query = "with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' group by col1), "
         + "tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) "
         + "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 4);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
-    Assert.assertEquals(tableNames.get(2), "c");
-    Assert.assertEquals(tableNames.get(3), "d");
+    assertEquals(tableNames.size(), 4);
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(2), "c");
+    assertEquals(tableNames.get(3), "d");
 
     // query with aliases, JOIN, IN/NOT-IN, group-by and explain
     query = "explain plan for with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' "
         + "group by col1), tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) "
         + "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 4);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
-    Assert.assertEquals(tableNames.get(2), "c");
-    Assert.assertEquals(tableNames.get(3), "d");
+    assertEquals(tableNames.size(), 4);
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.get(2), "c");
+    assertEquals(tableNames.get(3), "d");
 
     // lateral join query
     query = "EXPLAIN PLAN FOR SELECT a.col1, newb.sum_col3 FROM a JOIN LATERAL "
         + "(SELECT SUM(col3) as sum_col3 FROM b WHERE col2 = a.col2) AS newb ON TRUE";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 2);
-    Assert.assertEquals(tableNames.get(0), "a");
-    Assert.assertEquals(tableNames.get(1), "b");
+    assertEquals(tableNames.size(), 2);
+    assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.get(1), "b");
 
     // test for self join queries
     query = "SELECT a.col1 FROM a JOIN(SELECT col2 FROM a) as self ON a.col1=self.col2 ";
     tableNames = _queryEnvironment.getTableNamesForQuery(query);
-    Assert.assertEquals(tableNames.size(), 1);
-    Assert.assertEquals(tableNames.get(0), "a");
+    assertEquals(tableNames.size(), 1);
+    assertEquals(tableNames.get(0), "a");
   }
 
   // --------------------------------------------------------------------------
@@ -375,7 +372,7 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
   // --------------------------------------------------------------------------
 
   private static void assertNodeTypeNotIn(PlanNode node, List<Class<? extends AbstractPlanNode>> bannedNodeType) {
-    Assert.assertFalse(isOneOf(bannedNodeType, node));
+    assertFalse(isOneOf(bannedNodeType, node));
     for (PlanNode child : node.getInputs()) {
       assertNodeTypeNotIn(child, bannedNodeType);
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4796383a0b..fe078e87c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -34,8 +34,10 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
@@ -43,8 +45,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
 public class QueryRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
 
+  private String _hostname;
+  private int _port;
   private HelixManager _helixManager;
   private ServerMetrics _serverMetrics;
 
@@ -88,15 +90,16 @@ public class QueryRunner {
    */
   public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager,
       ServerMetrics serverMetrics) {
-    _helixManager = helixManager;
-    _serverMetrics = serverMetrics;
-
     String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
       hostname = hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
     }
     int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
+    _hostname = hostname;
+    _port = port;
+    _helixManager = helixManager;
+    _serverMetrics = serverMetrics;
 
     // TODO: Consider using separate config for intermediate stage and leaf stage
     String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -163,25 +166,25 @@ public class QueryRunner {
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
       TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+      int stageId = stageMetadata.getStageId();
       LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId,
-          stagePlan.getStageId(), errorBlock.getExceptions());
+          stageId, errorBlock.getExceptions());
       int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId();
-      MailboxMetadata mailboxMetadata = workerMetadata.getMailboxMetadataMap().get(receiverStageId);
-      List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
-      List<VirtualServerAddress> virtualAddresses = mailboxMetadata.getVirtualAddresses();
-      int numMailboxes = mailboxIds.size();
-      for (int i = 0; i < numMailboxes; i++) {
-        String mailboxId = mailboxIds.get(i);
-        VirtualServerAddress virtualAddress = virtualAddresses.get(i);
+      List<MailboxInfo> receiverMailboxInfos =
+          workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+      List<RoutingInfo> routingInfos =
+          MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId,
+              receiverMailboxInfos);
+      for (RoutingInfo routingInfo : routingInfos) {
         try {
-          _mailboxService.getSendingMailbox(virtualAddress.hostname(), virtualAddress.port(), mailboxId, deadlineMs)
-              .send(errorBlock);
+          _mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(),
+              routingInfo.getMailboxId(), deadlineMs).send(errorBlock);
         } catch (TimeoutException e) {
-          LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, requestId,
-              stagePlan.getStageId(), e);
+          LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}",
+              routingInfo.getMailboxId(), requestId, stageId, e);
         } catch (Exception e) {
-          LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", mailboxId,
-              requestId, stagePlan.getStageId(), e);
+          LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}",
+              routingInfo.getMailboxId(), requestId, stageId, e);
         }
       }
       return;
@@ -189,8 +192,8 @@ public class QueryRunner {
 
     // run OpChain
     OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(_mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
-            stageMetadata, workerMetadata, pipelineBreakerResult);
+        new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata,
+            workerMetadata, pipelineBreakerResult);
     OpChain opChain;
     if (workerMetadata.isLeafStageWorker()) {
       opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics,
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 37903c2f72..808caba04e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
 import org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer;
@@ -58,9 +58,11 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
     _exchangeType = exchangeType;
 
     long requestId = context.getRequestId();
-    MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(senderStageId);
-    if (mailboxMetadata != null && !mailboxMetadata.getMailboxIds().isEmpty()) {
-      _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
+    MailboxInfos mailboxInfos = context.getWorkerMetadata().getMailboxInfosMap().get(senderStageId);
+    if (mailboxInfos != null) {
+      _mailboxIds =
+          MailboxIdUtils.toMailboxIds(requestId, senderStageId, mailboxInfos.getMailboxInfos(), context.getStageId(),
+              context.getWorkerId());
     } else {
       _mailboxIds = Collections.emptyList();
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index f74feb1cac..5d66d6afac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -20,11 +20,11 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -32,8 +32,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -91,18 +91,14 @@ public class MailboxSendOperator extends MultiStageOperator {
     long requestId = context.getRequestId();
     long deadlineMs = context.getDeadlineMs();
 
-    MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(receiverStageId);
-    List<String> sendingMailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
-    List<VirtualServerAddress> sendingAddresses = mailboxMetadata.getVirtualAddresses();
-    int numMailboxes = sendingMailboxIds.size();
-    List<SendingMailbox> sendingMailboxes = new ArrayList<>(numMailboxes);
-    for (int i = 0; i < numMailboxes; i++) {
-      String sendingMailboxId = sendingMailboxIds.get(i);
-      VirtualServerAddress sendingAddress = sendingAddresses.get(i);
-      sendingMailboxes.add(
-          mailboxService.getSendingMailbox(sendingAddress.hostname(), sendingAddress.port(), sendingMailboxId,
-              deadlineMs));
-    }
+    List<MailboxInfo> mailboxInfos =
+        context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+    List<RoutingInfo> routingInfos =
+        MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId,
+            mailboxInfos);
+    List<SendingMailbox> sendingMailboxes = routingInfos.stream()
+        .map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs))
+        .collect(Collectors.toList());
     return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys,
         TransferableBlockUtils::splitBlock);
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 5059b2f8ec..51d61e5dbc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.plan;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.operator.OpChainId;
@@ -38,11 +39,11 @@ import org.apache.pinot.spi.utils.CommonConstants;
 public class OpChainExecutionContext {
   private final MailboxService _mailboxService;
   private final long _requestId;
-  private final int _stageId;
   private final long _deadlineMs;
   private final Map<String, String> _opChainMetadata;
   private final StageMetadata _stageMetadata;
   private final WorkerMetadata _workerMetadata;
+  private final VirtualServerAddress _server;
   private final OpChainId _id;
   private final OpChainStats _stats;
   private final PipelineBreakerResult _pipelineBreakerResult;
@@ -50,17 +51,18 @@ public class OpChainExecutionContext {
 
   private ServerPlanRequestContext _leafStageContext;
 
-  public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+  public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs,
       Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata,
       PipelineBreakerResult pipelineBreakerResult) {
     _mailboxService = mailboxService;
     _requestId = requestId;
-    _stageId = stageId;
     _deadlineMs = deadlineMs;
     _opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
     _stageMetadata = stageMetadata;
     _workerMetadata = workerMetadata;
-    _id = new OpChainId(requestId, workerMetadata.getVirtualAddress().workerId(), stageId);
+    _server =
+        new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), workerMetadata.getWorkerId());
+    _id = new OpChainId(requestId, workerMetadata.getWorkerId(), stageMetadata.getStageId());
     _stats = new OpChainStats(_id.toString());
     _pipelineBreakerResult = pipelineBreakerResult;
     if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
@@ -78,11 +80,15 @@ public class OpChainExecutionContext {
   }
 
   public int getStageId() {
-    return _stageId;
+    return _stageMetadata.getStageId();
+  }
+
+  public int getWorkerId() {
+    return _workerMetadata.getWorkerId();
   }
 
   public VirtualServerAddress getServer() {
-    return _workerMetadata.getVirtualAddress();
+    return _server;
   }
 
   public long getDeadlineMs() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index aec7998e16..a033df03d7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -36,7 +37,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,12 +77,12 @@ public class PipelineBreakerExecutor {
         //     OpChain receive-mail callbacks.
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
         OpChainExecutionContext opChainExecutionContext =
-            new OpChainExecutionContext(mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+            new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata,
                 stagePlan.getStageMetadata(), workerMetadata, null);
         return execute(scheduler, pipelineBreakerContext, opChainExecutionContext);
       } catch (Exception e) {
         LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId,
-            stagePlan.getStageId(), e);
+            stagePlan.getStageMetadata().getStageId(), e);
         return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(),
             TransferableBlockUtils.getErrorTransferableBlock(e), null);
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 3c03fa1539..cc9d131962 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,7 +24,7 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.runtime.plan.StagePlan;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 4c504f71d6..9b8d19d77d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -47,11 +47,11 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 06df28f561..f217c9b01a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -54,7 +54,9 @@ import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
 import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -63,8 +65,6 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -170,11 +170,11 @@ public class QueryDispatcher {
                   QueryPlanSerDeUtils.toProtoWorkerMetadataList(workerMetadataList);
               StageInfo stageInfo = stageInfos.get(i);
               Worker.StageMetadata stageMetadata =
-                  Worker.StageMetadata.newBuilder().addAllWorkerMetadata(protoWorkerMetadataList)
+                  Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(protoWorkerMetadataList)
                       .setCustomProperty(stageInfo._customProperty).build();
               requestBuilder.addStagePlan(
-                  Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(stageInfo._rootNode)
-                      .setStageMetadata(stageMetadata).build());
+                  Worker.StagePlan.newBuilder().setRootNode(stageInfo._rootNode).setStageMetadata(stageMetadata)
+                      .build());
             }
           }
           requestBuilder.setMetadata(protoRequestMetadata);
@@ -264,10 +264,10 @@ public class QueryDispatcher {
     List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
     Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s",
         workerMetadataList.size());
-    StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+    StageMetadata stageMetadata = new StageMetadata(0, workerMetadataList, dispatchableStagePlan.getCustomProperties());
     OpChainExecutionContext opChainExecutionContext =
-        new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(),
-            System.currentTimeMillis() + timeoutMs, queryOptions, stageMetadata, workerMetadataList.get(0), null);
+        new OpChainExecutionContext(mailboxService, requestId, System.currentTimeMillis() + timeoutMs, queryOptions,
+            stageMetadata, workerMetadataList.get(0), null);
     MailboxReceiveOperator receiveOperator =
         new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(),
             receiveNode.getSenderStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 2e52c28a5a..c8caed9100 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -31,11 +31,11 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -124,7 +124,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
         } catch (Exception e) {
           throw new RuntimeException(
               String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", requestId,
-                  protoStagePlan.getStageId()), e);
+                  protoStagePlan.getStageMetadata().getStageId()), e);
         }
         StageMetadata stageMetadata = stagePlan.getStageMetadata();
         List<WorkerMetadata> workerMetadataList = stageMetadata.getWorkerMetadataList();
@@ -142,7 +142,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
         } catch (Exception e) {
           throw new RuntimeException(
               String.format("Caught exception while submitting request: %d, stage: %d", requestId,
-                  protoStagePlan.getStageId()), e);
+                  stageMetadata.getStageId()), e);
         } finally {
           for (CompletableFuture<?> future : workerSubmissionStubs) {
             if (!future.isDone()) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 1811218eda..3de9cc6fdb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -27,9 +27,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.data.Schema;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index c34c858e76..7a35202529 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class OpChainSchedulerServiceTest {
@@ -70,10 +72,13 @@ public class OpChainSchedulerServiceTest {
   }
 
   private OpChain getChain(MultiStageOperator operator) {
-    WorkerMetadata workerMetadata =
-        new WorkerMetadata(new VirtualServerAddress("localhost", 123, 0), ImmutableMap.of(), ImmutableMap.of());
-    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, Long.MAX_VALUE, ImmutableMap.of(),
-        new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
+    MailboxService mailboxService = mock(MailboxService.class);
+    when(mailboxService.getHostname()).thenReturn("localhost");
+    when(mailboxService.getPort()).thenReturn(1234);
+    WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(),
+            new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
     return new OpChain(context, operator);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 1a2949b142..2868f6526c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -29,13 +29,14 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
@@ -70,25 +71,21 @@ public class MailboxReceiveOperatorTest {
 
   @BeforeClass
   public void setUp() {
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
-        new MailboxMetadata(
-            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
-            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
-    _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
-        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
-        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
-        ImmutableMap.of())), ImmutableMap.of());
+    MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1)));
+    _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1)
+        .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of()))
+        .collect(Collectors.toList()), ImmutableMap.of());
+    MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+    _stageMetadata1 = new StageMetadata(0,
+        ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())),
+        ImmutableMap.of());
   }
 
   @BeforeMethod
   public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_mailboxService.getHostname()).thenReturn("localhost");
-    when(_mailboxService.getPort()).thenReturn(123);
+    when(_mailboxService.getPort()).thenReturn(1234);
   }
 
   @AfterMethod
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 86b2ac0000..92f7b2ce7a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -25,13 +25,12 @@ import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.testng.annotations.AfterMethod;
@@ -52,20 +51,17 @@ public class MailboxSendOperatorTest {
 
   private AutoCloseable _mocks;
   @Mock
-  private VirtualServerAddress _server;
+  private MailboxService _mailboxService;
   @Mock
   private MultiStageOperator _sourceOperator;
   @Mock
-  private MailboxService _mailboxService;
-  @Mock
   private BlockExchange _exchange;
 
   @BeforeMethod
   public void setUpMethod() {
     _mocks = openMocks(this);
-    when(_server.hostname()).thenReturn("mock");
-    when(_server.port()).thenReturn(0);
-    when(_server.workerId()).thenReturn(0);
+    when(_mailboxService.getHostname()).thenReturn("localhost");
+    when(_mailboxService.getPort()).thenReturn(1234);
   }
 
   @AfterMethod
@@ -198,11 +194,12 @@ public class MailboxSendOperatorTest {
   }
 
   private MailboxSendOperator getMailboxSendOperator() {
-    WorkerMetadata workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(), ImmutableMap.of());
-    StageMetadata stageMetadata = new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of());
+    WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+    StageMetadata stageMetadata =
+        new StageMetadata(SENDER_STAGE_ID, ImmutableList.of(workerMetadata), ImmutableMap.of());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, Long.MAX_VALUE, ImmutableMap.of(),
-            stageMetadata, workerMetadata, null);
+        new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(), stageMetadata,
+            workerMetadata, null);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index a74dec4e6f..f4cd38a42e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -44,15 +44,15 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -76,15 +76,12 @@ public class OpChainTest {
   private final List<TransferableBlock> _blockList = new ArrayList<>();
   private final ExecutorService _executor = Executors.newCachedThreadPool();
   private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>();
-  private final VirtualServerAddress _serverAddress = new VirtualServerAddress("localhost", 123, 0);
-  private final WorkerMetadata _workerMetadata = new WorkerMetadata(_serverAddress, ImmutableMap.of(0,
-      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-          ImmutableList.of(_serverAddress)), 1,
-      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-          ImmutableList.of(_serverAddress)), 2,
-      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-          ImmutableList.of(_serverAddress))), ImmutableMap.of());
-  private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+  private final MailboxInfos _mailboxInfos =
+      new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+  private final WorkerMetadata _workerMetadata =
+      new WorkerMetadata(0, ImmutableMap.of(0, _mailboxInfos, 1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of());
+  private final StageMetadata _stageMetadata =
+      new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of());
 
   private AutoCloseable _mocks;
   @Mock
@@ -199,13 +196,9 @@ public class OpChainTest {
   public void testStatsCollectionTracingEnabledMultipleOperators() {
     long dummyOperatorWaitTime = 1000L;
 
-    int receivedStageId = 2;
-    int senderStageId = 1;
-    OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE,
+    OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE,
         ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
-
-    Stack<MultiStageOperator> operators =
-        getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+    Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime);
 
     OpChain opChain = new OpChain(context, operators.peek());
     opChain.getStats().executing();
@@ -214,12 +207,10 @@ public class OpChainTest {
     }
     opChain.getStats().queued();
 
-    OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE,
-            ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
-
+    OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE,
+        ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
     MailboxReceiveOperator secondStageReceiveOp =
-        new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
+        new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1);
 
     assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
     int numOperators = operators.size();
@@ -238,14 +229,10 @@ public class OpChainTest {
   public void testStatsCollectionTracingDisableMultipleOperators() {
     long dummyOperatorWaitTime = 1000L;
 
-    int receivedStageId = 2;
-    int senderStageId = 1;
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, ImmutableMap.of(),
-            _stageMetadata, _workerMetadata, null);
-
-    Stack<MultiStageOperator> operators =
-        getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+        new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata,
+            _workerMetadata, null);
+    Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime);
 
     OpChain opChain = new OpChain(context, operators.peek());
     opChain.getStats().executing();
@@ -253,10 +240,10 @@ public class OpChainTest {
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, ImmutableMap.of(),
-            _stageMetadata, _workerMetadata, null);
+        new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata,
+            _workerMetadata, null);
     MailboxReceiveOperator secondStageReceiveOp =
-        new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
+        new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1);
 
     assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
     assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2);
@@ -275,8 +262,7 @@ public class OpChainTest {
     assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), 2);
   }
 
-  private Stack<MultiStageOperator> getFullOpchain(int receivedStageId, int senderStageId,
-      OpChainExecutionContext context, long waitTimeInMillis) {
+  private Stack<MultiStageOperator> getFullOpChain(OpChainExecutionContext context, long waitTimeInMillis) {
     Stack<MultiStageOperator> operators = new Stack<>();
     DataSchema upStreamSchema = new DataSchema(new String[]{"intCol"}, new ColumnDataType[]{ColumnDataType.INT});
     //Mailbox Receive Operator
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 3c132269c7..5280724541 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -26,15 +26,18 @@ import java.util.Map;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 
 public class OperatorTestUtil {
   // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
@@ -78,7 +81,7 @@ public class OperatorTestUtil {
 
   public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs,
       StageMetadata stageMetadata) {
-    return new OpChainExecutionContext(mailboxService, 0, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
+    return new OpChainExecutionContext(mailboxService, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
         stageMetadata.getWorkerMetadataList().get(0), null);
   }
 
@@ -91,9 +94,11 @@ public class OperatorTestUtil {
   }
 
   private static OpChainExecutionContext getDefaultContext(Map<String, String> opChainMetadata) {
-    WorkerMetadata workerMetadata =
-        new WorkerMetadata(new VirtualServerAddress("mock", 80, 0), ImmutableMap.of(), ImmutableMap.of());
-    return new OpChainExecutionContext(null, 1, 2, Long.MAX_VALUE, opChainMetadata,
-        new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
+    MailboxService mailboxService = mock(MailboxService.class);
+    when(mailboxService.getHostname()).thenReturn("localhost");
+    when(mailboxService.getPort()).thenReturn(1234);
+    WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+    return new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, opChainMetadata,
+        new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 1e71018215..314081588f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -34,13 +34,14 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
@@ -78,25 +79,21 @@ public class SortedMailboxReceiveOperatorTest {
 
   @BeforeClass
   public void setUp() {
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
-        new MailboxMetadata(
-            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
-            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-            ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
-    _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
-        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
-        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
-        ImmutableMap.of())), ImmutableMap.of());
+    MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1)));
+    _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1)
+        .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of()))
+        .collect(Collectors.toList()), ImmutableMap.of());
+    MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+    _stageMetadata1 = new StageMetadata(0,
+        ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())),
+        ImmutableMap.of());
   }
 
   @BeforeMethod
   public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_mailboxService.getHostname()).thenReturn("localhost");
-    when(_mailboxService.getPort()).thenReturn(123);
+    when(_mailboxService.getPort()).thenReturn(1234);
   }
 
   @AfterMethod
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 94d5e2b873..58dcd2106e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -34,7 +34,11 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -42,8 +46,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -65,13 +67,12 @@ public class PipelineBreakerExecutorTest {
   private final VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
   private final ExecutorService _executor = Executors.newCachedThreadPool();
   private final OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
-  private final WorkerMetadata _workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(0, new MailboxMetadata(
-          ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
-          ImmutableList.of(_server, _server)), 1,
-      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(_server)), 2,
-      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server))),
-      ImmutableMap.of());
-  private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+  private final MailboxInfos _mailboxInfos =
+      new SharedMailboxInfos(new MailboxInfo("localhost", 123, ImmutableList.of(0)));
+  private final WorkerMetadata _workerMetadata =
+      new WorkerMetadata(0, ImmutableMap.of(1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of());
+  private final StageMetadata _stageMetadata =
+      new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of());
 
   private AutoCloseable _mocks;
   @Mock
@@ -107,7 +108,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode mailboxReceiveNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -145,7 +146,7 @@ public class PipelineBreakerExecutorTest {
         new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(mailboxReceiveNode2);
-    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -181,7 +182,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    StagePlan stagePlan = new StagePlan(0, incorrectlyConfiguredMailboxNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(incorrectlyConfiguredMailboxNode, _stageMetadata);
 
     // when
     PipelineBreakerResult pipelineBreakerResult =
@@ -204,7 +205,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode mailboxReceiveNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -240,7 +241,7 @@ public class PipelineBreakerExecutorTest {
         new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
-    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -278,7 +279,7 @@ public class PipelineBreakerExecutorTest {
         new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
-    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+    StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index f1315aa1bf..298af4deaf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -58,9 +58,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -162,9 +162,9 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
       QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
       List<WorkerMetadata> workerMetadataList =
           entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
-      StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
-      StagePlan stagePlan =
-          new StagePlan(stageId, dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
+      StageMetadata stageMetadata =
+          new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties());
+      StagePlan stagePlan = new StagePlan(dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
       for (WorkerMetadata workerMetadata : workerMetadataList) {
         submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap));
       }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 679f46c60c..2595d48617 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -42,11 +42,11 @@ import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
 import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.EqualityUtils;
@@ -135,7 +135,8 @@ public class QueryServerTest extends QueryTestSet {
 
       DispatchablePlanFragment dispatchableStagePlan = stagePlans.get(stageId);
       List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
-      StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+      StageMetadata stageMetadata =
+          new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties());
 
       // ensure mock query runner received correctly deserialized payload.
       QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
@@ -190,8 +191,8 @@ public class QueryServerTest extends QueryTestSet {
   }
 
   private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) {
-    return expected.getVirtualAddress().equals(actual.getVirtualAddress()) && EqualityUtils.isEqual(
-        expected.getTableSegmentsMap(), actual.getTableSegmentsMap());
+    return expected.getWorkerId() == actual.getWorkerId() && EqualityUtils.isEqual(expected.getTableSegmentsMap(),
+        actual.getTableSegmentsMap());
   }
 
   private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
@@ -235,11 +236,10 @@ public class QueryServerTest extends QueryTestSet {
     // as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest)
     QueryServerInstance serverInstance = stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
     Worker.StageMetadata stageMetadata =
-        Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList).setCustomProperty(customProperty)
-            .build();
+        Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(workerMetadataList)
+            .setCustomProperty(customProperty).build();
     Worker.StagePlan protoStagePlan =
-        Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(rootNode.toByteString())
-            .setStageMetadata(stageMetadata).build();
+        Worker.StagePlan.newBuilder().setRootNode(rootNode.toByteString()).setStageMetadata(stageMetadata).build();
 
     Map<String, String> requestMetadata = new HashMap<>();
     // the default configurations that must exist.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org