You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/07 19:16:55 UTC

(pinot) branch master updated: [Multi-stage] Optimize query plan serialization (#12370)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8434158758 [Multi-stage] Optimize query plan serialization (#12370)
8434158758 is described below

commit 8434158758e1824d3c38de5912925516b7bc5c8d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 7 11:16:49 2024 -0800

    [Multi-stage] Optimize query plan serialization (#12370)
---
 pinot-common/src/main/proto/mailbox.proto          |  18 --
 pinot-common/src/main/proto/plan.proto             |   2 +-
 pinot-common/src/main/proto/server.proto           |  18 --
 pinot-common/src/main/proto/worker.proto           |  33 +---
 .../explain/PhysicalExplainPlanVisitor.java        |   6 +-
 .../planner/physical/DispatchablePlanContext.java  |   9 +-
 .../planner/physical/MailboxAssignmentVisitor.java |  47 +++--
 .../query/planner/physical/MailboxIdUtils.java     |  26 ++-
 .../pinot/query/routing/MailboxMetadata.java       |  63 ++-----
 .../apache/pinot/query/routing/WorkerMetadata.java |  97 +++++------
 .../apache/pinot/query/mailbox/MailboxIdUtils.java |  50 ------
 .../apache/pinot/query/runtime/QueryRunner.java    |  58 ++++---
 .../operator/BaseMailboxReceiveOperator.java       |   8 +-
 .../runtime/operator/MailboxSendOperator.java      |  20 ++-
 .../runtime/plan/OpChainExecutionContext.java      |  17 +-
 .../pinot/query/runtime/plan/StageMetadata.java    |  58 +------
 .../{DistributedStagePlan.java => StagePlan.java}  |  36 +---
 .../plan/pipeline/PipelineBreakerExecutor.java     |  19 ++-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 154 +++++------------
 .../plan/server/ServerPlanRequestContext.java      |  10 +-
 .../plan/server/ServerPlanRequestUtils.java        |  65 +++----
 .../query/service/dispatch/QueryDispatcher.java    |  76 ++++++---
 .../pinot/query/service/server/QueryServer.java    |  42 +++--
 .../apache/pinot/query/QueryServerEnclosure.java   |   7 +-
 .../pinot/query/mailbox/MailboxServiceTest.java    |  13 +-
 .../executor/OpChainSchedulerServiceTest.java      |  12 +-
 .../operator/MailboxReceiveOperatorTest.java       |  70 ++++----
 .../runtime/operator/MailboxSendOperatorTest.java  |  17 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  56 +++---
 .../query/runtime/operator/OperatorTestUtil.java   |  30 ++--
 .../operator/SortedMailboxReceiveOperatorTest.java |  71 ++++----
 .../plan/pipeline/PipelineBreakerExecutorTest.java | 100 +++++------
 .../plan/serde/QueryPlanSerDeUtilsTest.java        |  58 -------
 .../query/runtime/queries/QueryRunnerTestBase.java |  33 ++--
 .../query/service/server/QueryServerTest.java      | 189 ++++++++++-----------
 35 files changed, 626 insertions(+), 962 deletions(-)

diff --git a/pinot-common/src/main/proto/mailbox.proto b/pinot-common/src/main/proto/mailbox.proto
index 6e1cca9a92..2ffe923c4d 100644
--- a/pinot-common/src/main/proto/mailbox.proto
+++ b/pinot-common/src/main/proto/mailbox.proto
@@ -17,24 +17,6 @@
 // under the License.
 //
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 syntax = "proto3";
 
 package org.apache.pinot.common.proto;
diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto
index b36fa652b0..144a6fd7cc 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -78,4 +78,4 @@ message ListField {
 // The key of the map is a string and the value of the map is a MemberVariableField.
 message MapField {
   map<string, MemberVariableField> content = 1;
-}
\ No newline at end of file
+}
diff --git a/pinot-common/src/main/proto/server.proto b/pinot-common/src/main/proto/server.proto
index 0239ae125f..7781d6f96e 100644
--- a/pinot-common/src/main/proto/server.proto
+++ b/pinot-common/src/main/proto/server.proto
@@ -17,24 +17,6 @@
 // under the License.
 //
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 syntax = "proto3";
 
 package org.apache.pinot.common.proto;
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index dfb1cd53eb..b7e492fcc5 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -17,30 +17,10 @@
 // under the License.
 //
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 syntax = "proto3";
 
 package org.apache.pinot.common.proto;
 
-import "plan.proto";
-
 service PinotQueryWorker {
   // Dispatch a QueryRequest to a PinotQueryWorker
   rpc Submit(QueryRequest) returns (QueryResponse);
@@ -59,7 +39,7 @@ message CancelResponse {
 // QueryRequest is the dispatched content for all query stages to a physical worker.
 message QueryRequest {
   repeated StagePlan stagePlan = 1;
-  map<string, string> metadata = 2;
+  bytes metadata = 2; // Serialized Properties
 }
 
 // QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
@@ -70,15 +50,13 @@ message QueryResponse {
 
 message StagePlan {
   int32 stageId = 1;
-  StageNode stageRoot = 2;
+  bytes rootNode = 2; // Serialized StageNode
   StageMetadata stageMetadata = 3;
 }
 
 message StageMetadata {
   repeated WorkerMetadata workerMetadata = 1;
-  map<string, string> customProperty = 2;
-  string serverAddress = 3;
-  repeated int32 workerIds = 4;
+  bytes customProperty = 2; // Serialized Properties
 }
 
 message WorkerMetadata {
@@ -90,5 +68,8 @@ message WorkerMetadata {
 message MailboxMetadata {
   repeated string mailboxId = 1;
   repeated string virtualAddress = 2;
-  map<string, string> customProperty = 3;
+}
+
+message Properties {
+  map<string, string> property = 1;
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
index e1a6ac1176..ea9bef1139 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
@@ -61,7 +61,6 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
   /**
    * Explains the query plan.
    *
-   * @see DispatchableSubPlan#explain()
    * @param dispatchableSubPlan the queryPlan to explain
    * @return a String representation of the query plan tree
    */
@@ -216,9 +215,8 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
 
     int receiverStageId = node.getReceiverStageId();
     List<VirtualServerAddress> serverAddressList =
-        _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
-            .getWorkerMetadataList().get(context._workerId)
-            .getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
+        _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
+            .get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
     List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
     context._builder.append("->");
     String receivers = serverInstanceToWorkerIdList.stream()
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index f17f48fd2f..22744dda0e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -108,13 +108,12 @@ public class DispatchablePlanContext {
         int workerId = serverEntry.getKey();
         QueryServerInstance queryServerInstance = serverEntry.getValue();
         serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
-        WorkerMetadata.Builder workerMetadataBuilder = new WorkerMetadata.Builder().setVirtualServerAddress(
-            new VirtualServerAddress(queryServerInstance, workerId));
+        WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
+            workerIdToMailboxesMap.get(workerId));
         if (workerIdToSegmentsMap != null) {
-          workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
+          workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
         }
-        workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
-        workerMetadataArray[workerId] = workerMetadataBuilder.build();
+        workerMetadataArray[workerId] = workerMetadata;
       }
 
       // set the stageMetadata
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 736b443c53..421e7bbc9c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.query.planner.physical;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
@@ -63,7 +65,7 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
               workerId, senderServer, receiverServer);
           MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
               MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
-              Collections.singletonList(new VirtualServerAddress(senderServer, workerId)), Collections.emptyMap());
+              Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
           senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
           receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
         }
@@ -78,11 +80,9 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
           for (int workerId = 0; workerId < numSenders; workerId++) {
             String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
             MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
-                Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
-                Collections.emptyMap());
+                Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
             MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
-                Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
-                Collections.emptyMap());
+                Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
             senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
                 .put(receiverFragmentId, serderMailboxMetadata);
             receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
@@ -94,22 +94,23 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
           for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
             VirtualServerAddress senderAddress =
                 new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
-            MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+            List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
+            List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
+            MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
             senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
                 .put(receiverFragmentId, senderMailboxMetadata);
             for (int i = 0; i < partitionParallelism; i++) {
-              VirtualServerAddress receiverAddress =
-                  new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
               String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
                   receiverWorkerId);
-              senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
-              senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+              receivingMailboxIds.add(mailboxId);
+              receivingAddresses.add(
+                  new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
 
               MailboxMetadata receiverMailboxMetadata =
                   receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
                       .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
-              receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
-              receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+              receiverMailboxMetadata.getMailboxIds().add(mailboxId);
+              receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
 
               receiverWorkerId++;
             }
@@ -123,22 +124,22 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
         for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
           VirtualServerAddress senderAddress =
               new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
-          MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+          List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
+          List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
+          MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
           senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
               .put(receiverFragmentId, senderMailboxMetadata);
           for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
-            VirtualServerAddress receiverAddress =
-                new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
             String mailboxId =
                 MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
-            senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
-            senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+            receivingMailboxIds.add(mailboxId);
+            receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
 
             MailboxMetadata receiverMailboxMetadata =
                 receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
                     .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
-            receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
-            receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+            receiverMailboxMetadata.getMailboxIds().add(mailboxId);
+            receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
           }
         }
       }
@@ -154,14 +155,12 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
     int numReceivers = receiverServerMap.size();
     if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
       // leaf-to-intermediate condition
-      return numSenders * sender.getPartitionParallelism() == numReceivers
-          && sender.getPartitionFunction() != null
+      return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
           && sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
     } else {
       // dynamic-broadcast condition || intermediate-to-intermediate
-      return numSenders == numReceivers
-          && sender.getPartitionFunction() != null
-          && sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
+      return numSenders == numReceivers && sender.getPartitionFunction() != null && sender.getPartitionFunction()
+          .equalsIgnoreCase(receiver.getPartitionFunction());
     }
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
index c949c6598f..32c7d3197a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
@@ -18,15 +18,35 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.query.routing.MailboxMetadata;
+
+
 public class MailboxIdUtils {
   private MailboxIdUtils() {
   }
 
-  private static final char SEPARATOR = '|';
+  public static final char SEPARATOR = '|';
 
   public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
       int receiverWorkerId) {
-    return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR
-        + receiverStageId + SEPARATOR + receiverWorkerId;
+    return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
+        + receiverWorkerId;
+  }
+
+  public static String toMailboxId(long requestId, String planMailboxId) {
+    return Long.toString(requestId) + SEPARATOR + planMailboxId;
+  }
+
+  public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
+    return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
+  }
+
+  @VisibleForTesting
+  public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
+      int receiverWorkerId) {
+    return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
index dcc46c8271..b3484d1a7b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
@@ -19,10 +19,7 @@
 package org.apache.pinot.query.routing;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 
 
 /**
@@ -31,68 +28,32 @@ import java.util.Objects;
  *  <ul>
  *    <li>MailboxId: the unique id of the mailbox</li>
  *    <li>VirtualAddress: the virtual address of the mailbox</li>
- *    <li>CustomProperties: the custom properties of the mailbox</li>
  *  </ul>
  */
 public class MailboxMetadata {
-  private final List<String> _mailBoxIdList;
-  private final List<VirtualServerAddress> _virtualAddressList;
-  private final Map<String, String> _customProperties;
+  private final List<String> _mailboxIds;
+  private final List<VirtualServerAddress> _virtualAddresses;
 
   public MailboxMetadata() {
-    _mailBoxIdList = new ArrayList<>();
-    _virtualAddressList = new ArrayList<>();
-    _customProperties = new HashMap<>();
+    _mailboxIds = new ArrayList<>();
+    _virtualAddresses = new ArrayList<>();
   }
 
-  public MailboxMetadata(List<String> mailBoxIdList, List<VirtualServerAddress> virtualAddressList,
-      Map<String, String> customProperties) {
-    _mailBoxIdList = mailBoxIdList;
-    _virtualAddressList = virtualAddressList;
-    _customProperties = customProperties;
+  public MailboxMetadata(List<String> mailboxIds, List<VirtualServerAddress> virtualAddresses) {
+    _mailboxIds = mailboxIds;
+    _virtualAddresses = virtualAddresses;
   }
 
-  public List<String> getMailBoxIdList() {
-    return _mailBoxIdList;
+  public List<String> getMailboxIds() {
+    return _mailboxIds;
   }
 
-  public String getMailBoxId(int index) {
-    return _mailBoxIdList.get(index);
-  }
-
-  public List<VirtualServerAddress> getVirtualAddressList() {
-    return _virtualAddressList;
-  }
-
-  public VirtualServerAddress getVirtualAddress(int index) {
-    return _virtualAddressList.get(index);
-  }
-
-  public Map<String, String> getCustomProperties() {
-    return _customProperties;
+  public List<VirtualServerAddress> getVirtualAddresses() {
+    return _virtualAddresses;
   }
 
   @Override
   public String toString() {
-    return _mailBoxIdList + "@" + _virtualAddressList.toString() + "#" + _customProperties.toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(_mailBoxIdList, _virtualAddressList, _customProperties);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    MailboxMetadata that = (MailboxMetadata) o;
-    return Objects.equals(_mailBoxIdList, that._mailBoxIdList)
-        && Objects.equals(_virtualAddressList, that._virtualAddressList)
-        && _customProperties.equals(that._customProperties);
+    return _mailboxIds + "@" + _virtualAddresses;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 9d92bfb697..3392261980 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -41,85 +42,63 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * MailboxSendNode and MailboxReceiveNode to derive the info during runtime. this should changed to plan time soon.
  */
 public class WorkerMetadata {
-  private final VirtualServerAddress _virtualServerAddress;
-  private final Map<Integer, MailboxMetadata> _mailBoxInfosMap;
+  public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
+
+  private final VirtualServerAddress _virtualAddress;
+  private final Map<Integer, MailboxMetadata> _mailboxMetadataMap;
   private final Map<String, String> _customProperties;
 
-  private WorkerMetadata(VirtualServerAddress virtualServerAddress, Map<Integer, MailboxMetadata> mailBoxInfosMap,
+  public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap) {
+    _virtualAddress = virtualAddress;
+    _mailboxMetadataMap = mailboxMetadataMap;
+    _customProperties = new HashMap<>();
+  }
+
+  public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap,
       Map<String, String> customProperties) {
-    _virtualServerAddress = virtualServerAddress;
-    _mailBoxInfosMap = mailBoxInfosMap;
+    _virtualAddress = virtualAddress;
+    _mailboxMetadataMap = mailboxMetadataMap;
     _customProperties = customProperties;
   }
 
-  public VirtualServerAddress getVirtualServerAddress() {
-    return _virtualServerAddress;
+  public VirtualServerAddress getVirtualAddress() {
+    return _virtualAddress;
   }
 
-  public Map<Integer, MailboxMetadata> getMailBoxInfosMap() {
-    return _mailBoxInfosMap;
+  public Map<Integer, MailboxMetadata> getMailboxMetadataMap() {
+    return _mailboxMetadataMap;
   }
 
   public Map<String, String> getCustomProperties() {
     return _customProperties;
   }
 
-  public static class Builder {
-    public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
-    private VirtualServerAddress _virtualServerAddress;
-    private Map<Integer, MailboxMetadata> _mailBoxInfosMap;
-    private Map<String, String> _customProperties;
-
-    public Builder() {
-      _mailBoxInfosMap = new HashMap<>();
-      _customProperties = new HashMap<>();
-    }
-
-    public Builder setVirtualServerAddress(VirtualServerAddress virtualServerAddress) {
-      _virtualServerAddress = virtualServerAddress;
-      return this;
-    }
-
-    public Builder putAllMailBoxInfosMap(Map<Integer, MailboxMetadata> mailBoxInfosMap) {
-      _mailBoxInfosMap.putAll(mailBoxInfosMap);
-      return this;
-    }
-
-    public Builder addMailBoxInfoMap(Integer planFragmentId, MailboxMetadata mailBoxMetadata) {
-      _mailBoxInfosMap.put(planFragmentId, mailBoxMetadata);
-      return this;
-    }
-
-    public Builder addTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
-      try {
-        String tableSegmentsMapStr = JsonUtils.objectToString(tableSegmentsMap);
-        _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException("Unable to serialize table segments map", e);
-      }
-      return this;
-    }
-
-    public WorkerMetadata build() {
-      return new WorkerMetadata(_virtualServerAddress, _mailBoxInfosMap, _customProperties);
-    }
-
-    public void putAllCustomProperties(Map<String, String> customPropertyMap) {
-      _customProperties.putAll(customPropertyMap);
-    }
-  }
-
-  public static Map<String, List<String>> getTableSegmentsMap(WorkerMetadata workerMetadata) {
-    String tableSegmentKeyStr = workerMetadata.getCustomProperties().get(Builder.TABLE_SEGMENTS_MAP_KEY);
-    if (tableSegmentKeyStr != null) {
+  @Nullable
+  public Map<String, List<String>> getTableSegmentsMap() {
+    String tableSegmentsMapStr = _customProperties.get(TABLE_SEGMENTS_MAP_KEY);
+    if (tableSegmentsMapStr != null) {
       try {
-        return JsonUtils.stringToObject(tableSegmentKeyStr, new TypeReference<Map<String, List<String>>>() {
+        return JsonUtils.stringToObject(tableSegmentsMapStr, new TypeReference<Map<String, List<String>>>() {
         });
       } catch (IOException e) {
-        throw new RuntimeException("Unable to deserialize table segments map", e);
+        throw new RuntimeException("Unable to deserialize table segments map: " + tableSegmentsMapStr, e);
       }
     } else {
       return null;
     }
   }
+
+  public boolean isLeafStageWorker() {
+    return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY);
+  }
+
+  public void setTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
+    String tableSegmentsMapStr;
+    try {
+      tableSegmentsMapStr = JsonUtils.objectToString(tableSegmentsMap);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Unable to serialize table segments map: " + tableSegmentsMap, e);
+    }
+    _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
deleted file mode 100644
index 7168afc486..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.mailbox;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.pinot.query.routing.MailboxMetadata;
-
-
-// TODO: De-couple mailbox id from query information
-public class MailboxIdUtils {
-  private MailboxIdUtils() {
-  }
-
-  private static final char SEPARATOR = '|';
-
-  @VisibleForTesting
-  public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
-      int receiverWorkerId) {
-    return Long.toString(requestId) + SEPARATOR + senderStageId + SEPARATOR + senderWorkerId + SEPARATOR
-        + receiverStageId + SEPARATOR + receiverWorkerId;
-  }
-
-  public static List<String> toMailboxIds(long requestId, MailboxMetadata mailBoxMetadata) {
-    return toMailboxIds(requestId, mailBoxMetadata.getMailBoxIdList());
-  }
-
-  public static List<String> toMailboxIds(long requestId, List<String> mailboxMetadataIdList) {
-    return mailboxMetadataIdList.stream()
-        .map(mailboxIdFromBroker -> Long.toString(requestId) + SEPARATOR + mailboxIdFromBroker)
-        .collect(Collectors.toList());
-  }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 526a489fb6..4796383a0b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -31,17 +31,20 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
@@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * {@link QueryRunner} accepts a {@link DistributedStagePlan} and runs it.
+ * {@link QueryRunner} accepts a {@link StagePlan} and runs it.
  */
 public class QueryRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
@@ -139,42 +142,46 @@ public class QueryRunner {
   }
 
   /**
-   * Execute a {@link DistributedStagePlan}.
+   * Execute a {@link StagePlan}.
    *
    * <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
    * for results/exceptions.</p>
    */
-  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata) {
+  public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> requestMetadata) {
     long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    Map<String, String> opChainMetadata = consolidateMetadata(
-        distributedStagePlan.getStageMetadata().getCustomProperties(), requestMetadata);
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
 
+    StageMetadata stageMetadata = stagePlan.getStageMetadata();
+    Map<String, String> opChainMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
+
     // run pre-stage execution for all pipeline breakers
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, distributedStagePlan,
+        PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan,
             opChainMetadata, requestId, deadlineMs);
 
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
       TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
       LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId,
-          distributedStagePlan.getStageId(), errorBlock.getExceptions());
-      int receiverStageId = ((MailboxSendNode) distributedStagePlan.getStageRoot()).getReceiverStageId();
-      MailboxMetadata mailboxMetadata = distributedStagePlan.getStageMetadata().getWorkerMetadataList()
-          .get(distributedStagePlan.getServer().workerId()).getMailBoxInfosMap().get(receiverStageId);
+          stagePlan.getStageId(), errorBlock.getExceptions());
+      int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId();
+      MailboxMetadata mailboxMetadata = workerMetadata.getMailboxMetadataMap().get(receiverStageId);
       List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
-      for (int i = 0; i < mailboxIds.size(); i++) {
+      List<VirtualServerAddress> virtualAddresses = mailboxMetadata.getVirtualAddresses();
+      int numMailboxes = mailboxIds.size();
+      for (int i = 0; i < numMailboxes; i++) {
+        String mailboxId = mailboxIds.get(i);
+        VirtualServerAddress virtualAddress = virtualAddresses.get(i);
         try {
-          _mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
-              mailboxMetadata.getVirtualAddress(i).port(), mailboxIds.get(i), deadlineMs).send(errorBlock);
+          _mailboxService.getSendingMailbox(virtualAddress.hostname(), virtualAddress.port(), mailboxId, deadlineMs)
+              .send(errorBlock);
         } catch (TimeoutException e) {
-          LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxIds.get(i),
-              requestId, distributedStagePlan.getStageId(), e);
+          LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, requestId,
+              stagePlan.getStageId(), e);
         } catch (Exception e) {
-          LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}",
-              mailboxIds.get(i), requestId, distributedStagePlan.getStageId(), e);
+          LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", mailboxId,
+              requestId, stagePlan.getStageId(), e);
         }
       }
       return;
@@ -182,15 +189,14 @@ public class QueryRunner {
 
     // run OpChain
     OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(_mailboxService, requestId, distributedStagePlan.getStageId(),
-            distributedStagePlan.getServer(), deadlineMs, opChainMetadata, distributedStagePlan.getStageMetadata(),
-            pipelineBreakerResult);
+        new OpChainExecutionContext(_mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+            stageMetadata, workerMetadata, pipelineBreakerResult);
     OpChain opChain;
-    if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
-      opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, distributedStagePlan, _helixManager,
-          _serverMetrics, _leafQueryExecutor, _executorService);
+    if (workerMetadata.isLeafStageWorker()) {
+      opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics,
+          _leafQueryExecutor, _executorService);
     } else {
-      opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
+      opChain = PhysicalPlanVisitor.walkPlanNode(stagePlan.getRootNode(), executionContext);
     }
     _opChainScheduler.register(opChain);
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 438cec8494..37903c2f72 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -24,9 +24,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
@@ -58,10 +58,8 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
     _exchangeType = exchangeType;
 
     long requestId = context.getRequestId();
-    int workerId = context.getServer().workerId();
-    MailboxMetadata mailboxMetadata =
-        context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
-    if (mailboxMetadata != null && !mailboxMetadata.getMailBoxIdList().isEmpty()) {
+    MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(senderStageId);
+    if (mailboxMetadata != null && !mailboxMetadata.getMailboxIds().isEmpty()) {
       _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
     } else {
       _mailboxIds = Collections.emptyList();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index f6f25510df..f74feb1cac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,11 +28,12 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -90,14 +91,17 @@ public class MailboxSendOperator extends MultiStageOperator {
     long requestId = context.getRequestId();
     long deadlineMs = context.getDeadlineMs();
 
-    int workerId = context.getServer().workerId();
-    MailboxMetadata mailboxMetadata =
-        context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(receiverStageId);
+    MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(receiverStageId);
     List<String> sendingMailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
-    List<SendingMailbox> sendingMailboxes = new ArrayList<>(sendingMailboxIds.size());
-    for (int i = 0; i < sendingMailboxIds.size(); i++) {
-      sendingMailboxes.add(mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
-          mailboxMetadata.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs));
+    List<VirtualServerAddress> sendingAddresses = mailboxMetadata.getVirtualAddresses();
+    int numMailboxes = sendingMailboxIds.size();
+    List<SendingMailbox> sendingMailboxes = new ArrayList<>(numMailboxes);
+    for (int i = 0; i < numMailboxes; i++) {
+      String sendingMailboxId = sendingMailboxIds.get(i);
+      VirtualServerAddress sendingAddress = sendingAddresses.get(i);
+      sendingMailboxes.add(
+          mailboxService.getSendingMailbox(sendingAddress.hostname(), sendingAddress.port(), sendingMailboxId,
+              deadlineMs));
     }
     return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys,
         TransferableBlockUtils::splitBlock);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 10069167d6..5059b2f8ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
@@ -38,10 +39,10 @@ public class OpChainExecutionContext {
   private final MailboxService _mailboxService;
   private final long _requestId;
   private final int _stageId;
-  private final VirtualServerAddress _server;
   private final long _deadlineMs;
   private final Map<String, String> _opChainMetadata;
   private final StageMetadata _stageMetadata;
+  private final WorkerMetadata _workerMetadata;
   private final OpChainId _id;
   private final OpChainStats _stats;
   private final PipelineBreakerResult _pipelineBreakerResult;
@@ -49,17 +50,17 @@ public class OpChainExecutionContext {
 
   private ServerPlanRequestContext _leafStageContext;
 
-  public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
-      VirtualServerAddress server, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata,
+  public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+      Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata,
       PipelineBreakerResult pipelineBreakerResult) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
-    _server = server;
     _deadlineMs = deadlineMs;
     _opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
     _stageMetadata = stageMetadata;
-    _id = new OpChainId(requestId, server.workerId(), stageId);
+    _workerMetadata = workerMetadata;
+    _id = new OpChainId(requestId, workerMetadata.getVirtualAddress().workerId(), stageId);
     _stats = new OpChainStats(_id.toString());
     _pipelineBreakerResult = pipelineBreakerResult;
     if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
@@ -81,7 +82,7 @@ public class OpChainExecutionContext {
   }
 
   public VirtualServerAddress getServer() {
-    return _server;
+    return _workerMetadata.getVirtualAddress();
   }
 
   public long getDeadlineMs() {
@@ -96,6 +97,10 @@ public class OpChainExecutionContext {
     return _stageMetadata;
   }
 
+  public WorkerMetadata getWorkerMetadata() {
+    return _workerMetadata;
+  }
+
   public OpChainId getId() {
     return _id;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
index f2543a3363..a07a04a0b7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
 
@@ -33,9 +32,9 @@ public class StageMetadata {
   private final List<WorkerMetadata> _workerMetadataList;
   private final Map<String, String> _customProperties;
 
-  StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+  public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
     _workerMetadataList = workerMetadataList;
-    _customProperties = Collections.unmodifiableMap(customProperties);
+    _customProperties = customProperties;
   }
 
   public List<WorkerMetadata> getWorkerMetadataList() {
@@ -46,54 +45,13 @@ public class StageMetadata {
     return _customProperties;
   }
 
-  public static class Builder {
-    public static final String TABLE_NAME_KEY = "tableName";
-    public static final String TIME_BOUNDARY_COLUMN_KEY = "timeBoundaryInfo.timeColumn";
-    public static final String TIME_BOUNDARY_VALUE_KEY = "timeBoundaryInfo.timeValue";
-    private List<WorkerMetadata> _workerMetadataList;
-    private Map<String, String> _customProperties;
-
-    public Builder() {
-      _customProperties = new HashMap<>();
-    }
-
-    public Builder setWorkerMetadataList(List<WorkerMetadata> workerMetadataList) {
-      _workerMetadataList = workerMetadataList;
-      return this;
-    }
-
-    public Builder addTableName(String tableName) {
-      _customProperties.put(TABLE_NAME_KEY, tableName);
-      return this;
-    }
-
-    public Builder addTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
-      _customProperties.put(TIME_BOUNDARY_COLUMN_KEY, timeBoundaryInfo.getTimeColumn());
-      _customProperties.put(TIME_BOUNDARY_VALUE_KEY, timeBoundaryInfo.getTimeValue());
-      return this;
-    }
-
-    public Builder addCustomProperties(Map<String, String> customPropertyMap) {
-      _customProperties.putAll(customPropertyMap);
-      return this;
-    }
-
-    public StageMetadata build() {
-      return new StageMetadata(_workerMetadataList, _customProperties);
-    }
-
-    public void putAllCustomProperties(Map<String, String> customPropertyMap) {
-      _customProperties.putAll(customPropertyMap);
-    }
-  }
-
-  public static String getTableName(StageMetadata metadata) {
-    return metadata.getCustomProperties().get(Builder.TABLE_NAME_KEY);
+  public String getTableName() {
+    return _customProperties.get(DispatchablePlanFragment.TABLE_NAME_KEY);
   }
 
-  public static TimeBoundaryInfo getTimeBoundary(StageMetadata metadata) {
-    String timeColumn = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_COLUMN_KEY);
-    String timeValue = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_VALUE_KEY);
+  public TimeBoundaryInfo getTimeBoundary() {
+    String timeColumn = _customProperties.get(DispatchablePlanFragment.TIME_BOUNDARY_COLUMN_KEY);
+    String timeValue = _customProperties.get(DispatchablePlanFragment.TIME_BOUNDARY_VALUE_KEY);
     return timeColumn != null && timeValue != null ? new TimeBoundaryInfo(timeColumn, timeValue) : null;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
similarity index 53%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
index 62e8d19254..a45b48c5de 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
@@ -18,30 +18,22 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.routing.WorkerMetadata;
 
 
 /**
- * {@code DistributedStagePlan} is the deserialized version of the
- * {@link org.apache.pinot.common.proto.Worker.StagePlan}.
+ * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
  *
  * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
  */
-public class DistributedStagePlan {
+public class StagePlan {
   private final int _stageId;
-  private final VirtualServerAddress _server;
-  private final PlanNode _stageRoot;
+  private final PlanNode _rootNode;
   private final StageMetadata _stageMetadata;
 
-  public DistributedStagePlan(int stageId, VirtualServerAddress server, PlanNode stageRoot,
-      StageMetadata stageMetadata) {
+  public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
     _stageId = stageId;
-    _server = server;
-    _stageRoot = stageRoot;
+    _rootNode = rootNode;
     _stageMetadata = stageMetadata;
   }
 
@@ -49,25 +41,11 @@ public class DistributedStagePlan {
     return _stageId;
   }
 
-  public VirtualServerAddress getServer() {
-    return _server;
-  }
-
-  public PlanNode getStageRoot() {
-    return _stageRoot;
+  public PlanNode getRootNode() {
+    return _rootNode;
   }
 
   public StageMetadata getStageMetadata() {
     return _stageMetadata;
   }
-
-  public WorkerMetadata getCurrentWorkerMetadata() {
-    return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
-  }
-
-  public static boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
-    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
-    Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
-    return segments != null && segments.size() > 0;
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 3db86807d7..aec7998e16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -29,13 +29,14 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,8 @@ public class PipelineBreakerExecutor {
    *
    * @param scheduler scheduler service to run the pipeline breaker main thread.
    * @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against.
-   * @param distributedStagePlan the distributed stage plan to run pipeline breaker on.
+   * @param workerMetadata worker metadata for the current worker.
+   * @param stagePlan the distributed stage plan to run pipeline breaker on.
    * @param opChainMetadata request metadata, including query options
    * @param requestId request ID
    * @param deadlineMs execution deadline
@@ -65,23 +67,22 @@ public class PipelineBreakerExecutor {
    */
   @Nullable
   public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
-      MailboxService mailboxService, DistributedStagePlan distributedStagePlan, Map<String, String> opChainMetadata,
-      long requestId, long deadlineMs) {
+      MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan,
+      Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
     PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
-    PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+    PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), pipelineBreakerContext);
     if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
       try {
         // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage
         //     OpChain receive-mail callbacks.
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
         OpChainExecutionContext opChainExecutionContext =
-            new OpChainExecutionContext(mailboxService, requestId, distributedStagePlan.getStageId(),
-                distributedStagePlan.getServer(), deadlineMs, opChainMetadata, distributedStagePlan.getStageMetadata(),
-                null);
+            new OpChainExecutionContext(mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+                stagePlan.getStageMetadata(), workerMetadata, null);
         return execute(scheduler, pipelineBreakerContext, opChainExecutionContext);
       } catch (Exception e) {
         LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId,
-            distributedStagePlan.getStageId(), e);
+            stagePlan.getStageId(), e);
         return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(),
             TransferableBlockUtils.getErrorTransferableBlock(e), null);
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index f4b34a145a..fbfb9487b4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -18,152 +18,86 @@
  */
 package org.apache.pinot.query.runtime.plan.serde;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.commons.lang.StringUtils;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 
 
 /**
  * This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
  */
 public class QueryPlanSerDeUtils {
-  private static final Pattern VIRTUAL_SERVER_PATTERN =
-      Pattern.compile("(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
-
   private QueryPlanSerDeUtils() {
-    // do not instantiate.
-  }
-
-  public static VirtualServerAddress protoToAddress(String virtualAddressStr) {
-    Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr);
-    if (!matcher.matches()) {
-      throw new IllegalArgumentException("Unexpected virtualAddressStr '" + virtualAddressStr + "'. This might "
-          + "happen if you are upgrading from an old version of the multistage engine to the current one in a rolling "
-          + "fashion.");
-    }
-
-    // Skipped netty and grpc port as they are not used in worker instance.
-    return new VirtualServerAddress(matcher.group("host"), Integer.parseInt(matcher.group("port")),
-        Integer.parseInt(matcher.group("virtualid")));
   }
 
-  public static String addressToProto(VirtualServerAddress serverAddress) {
-    return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
+  public static StagePlan fromProtoStagePlan(Worker.StagePlan protoStagePlan)
+      throws InvalidProtocolBufferException {
+    AbstractPlanNode rootNode =
+        StageNodeSerDeUtils.deserializeStageNode(Plan.StageNode.parseFrom(protoStagePlan.getRootNode()));
+    StageMetadata stageMetadata = fromProtoStageMetadata(protoStagePlan.getStageMetadata());
+    return new StagePlan(protoStagePlan.getStageId(), rootNode, stageMetadata);
   }
 
-  public static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) {
-    int stageId = stagePlan.getStageId();
-    Worker.StageMetadata protoStageMetadata = stagePlan.getStageMetadata();
-    String serverAddress = protoStageMetadata.getServerAddress();
-    String[] hostPort = StringUtils.split(serverAddress, ':');
-    String hostname = hostPort[0];
-    int port = Integer.parseInt(hostPort[1]);
-    AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot());
-    StageMetadata stageMetadata = fromProtoStageMetadata(protoStageMetadata);
-    List<Integer> workerIds = protoStageMetadata.getWorkerIdsList();
-    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>(workerIds.size());
-    for (int workerId : workerIds) {
-      distributedStagePlans.add(
-          new DistributedStagePlan(stageId, new VirtualServerAddress(hostname, port, workerId), stageRoot,
-              stageMetadata));
-    }
-    return distributedStagePlans;
-  }
-
-  private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
-    StageMetadata.Builder builder = new StageMetadata.Builder();
-    List<WorkerMetadata> workerMetadataList = new ArrayList<>();
-    for (Worker.WorkerMetadata protoWorkerMetadata : protoStageMetadata.getWorkerMetadataList()) {
-      workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
-    }
-    builder.setWorkerMetadataList(workerMetadataList);
-    builder.putAllCustomProperties(protoStageMetadata.getCustomPropertyMap());
-    return builder.build();
+  private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata)
+      throws InvalidProtocolBufferException {
+    List<WorkerMetadata> workerMetadataList =
+        protoStageMetadata.getWorkerMetadataList().stream().map(QueryPlanSerDeUtils::fromProtoWorkerMetadata)
+            .collect(Collectors.toList());
+    Map<String, String> customProperties = fromProtoProperties(protoStageMetadata.getCustomProperty());
+    return new StageMetadata(workerMetadataList, customProperties);
   }
 
   private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
-    WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
-    builder.setVirtualServerAddress(protoToAddress(protoWorkerMetadata.getVirtualAddress()));
-    builder.putAllMailBoxInfosMap(fromProtoMailboxMetadataMap(protoWorkerMetadata.getMailboxMetadataMap()));
-    builder.putAllCustomProperties(protoWorkerMetadata.getCustomPropertyMap());
-    return builder.build();
-  }
-
-  private static Map<Integer, MailboxMetadata> fromProtoMailboxMetadataMap(
-      Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap) {
-    Map<Integer, MailboxMetadata> mailboxMap = new HashMap<>();
-    for (Map.Entry<Integer, Worker.MailboxMetadata> entry : mailboxMetadataMap.entrySet()) {
-      mailboxMap.put(entry.getKey(), fromProtoMailbox(entry.getValue()));
-    }
-    return mailboxMap;
+    VirtualServerAddress virtualAddress = VirtualServerAddress.parse(protoWorkerMetadata.getVirtualAddress());
+    Map<Integer, MailboxMetadata> mailboxMetadataMap = protoWorkerMetadata.getMailboxMetadataMap().entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> fromProtoMailbox(e.getValue())));
+    return new WorkerMetadata(virtualAddress, mailboxMetadataMap, protoWorkerMetadata.getCustomPropertyMap());
   }
 
   private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) {
-    List<String> mailboxIds = new ArrayList<>();
-    List<VirtualServerAddress> virtualAddresses = new ArrayList<>();
-    for (int i = 0; i < protoMailboxMetadata.getMailboxIdCount(); i++) {
-      mailboxIds.add(protoMailboxMetadata.getMailboxId(i));
-      virtualAddresses.add(protoToAddress(protoMailboxMetadata.getVirtualAddress(i)));
-    }
-    MailboxMetadata mailboxMetadata =
-        new MailboxMetadata(mailboxIds, virtualAddresses, protoMailboxMetadata.getCustomPropertyMap());
-    return mailboxMetadata;
+    List<VirtualServerAddress> virtualAddresses =
+        protoMailboxMetadata.getVirtualAddressList().stream().map(VirtualServerAddress::parse)
+            .collect(Collectors.toList());
+    return new MailboxMetadata(protoMailboxMetadata.getMailboxIdList(), virtualAddresses);
   }
 
-  public static Worker.StageMetadata toProtoStageMetadata(List<Worker.WorkerMetadata> workerMetadataList,
-      Map<String, String> customProperties, QueryServerInstance serverInstance, List<Integer> workerIds) {
-    return Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList)
-        .putAllCustomProperty(customProperties)
-        .setServerAddress(String.format("%s:%d", serverInstance.getHostname(), serverInstance.getQueryMailboxPort()))
-        .addAllWorkerIds(workerIds).build();
+  public static Map<String, String> fromProtoProperties(ByteString protoProperties)
+      throws InvalidProtocolBufferException {
+    return Worker.Properties.parseFrom(protoProperties).getPropertyMap();
   }
 
-  public static List<Worker.WorkerMetadata> toProtoWorkerMetadataList(DispatchablePlanFragment planFragment) {
-    List<WorkerMetadata> workerMetadataList = planFragment.getWorkerMetadataList();
-    List<Worker.WorkerMetadata> protoWorkerMetadataList = new ArrayList<>(workerMetadataList.size());
-    for (WorkerMetadata workerMetadata : workerMetadataList) {
-      protoWorkerMetadataList.add(toProtoWorkerMetadata(workerMetadata));
-    }
-    return protoWorkerMetadataList;
+  public static List<Worker.WorkerMetadata> toProtoWorkerMetadataList(List<WorkerMetadata> workerMetadataList) {
+    return workerMetadataList.stream().map(QueryPlanSerDeUtils::toProtoWorkerMetadata).collect(Collectors.toList());
   }
 
   private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
-    Worker.WorkerMetadata.Builder builder = Worker.WorkerMetadata.newBuilder();
-    builder.setVirtualAddress(addressToProto(workerMetadata.getVirtualServerAddress()));
-    builder.putAllMailboxMetadata(toProtoMailboxMap(workerMetadata.getMailBoxInfosMap()));
-    builder.putAllCustomProperty(workerMetadata.getCustomProperties());
-    return builder.build();
+    Map<Integer, Worker.MailboxMetadata> protoMailboxMetadataMap =
+        workerMetadata.getMailboxMetadataMap().entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> toProtoMailboxMetadata(e.getValue())));
+    return Worker.WorkerMetadata.newBuilder().setVirtualAddress(workerMetadata.getVirtualAddress().toString())
+        .putAllMailboxMetadata(protoMailboxMetadataMap).putAllCustomProperty(workerMetadata.getCustomProperties())
+        .build();
   }
 
-  private static Map<Integer, Worker.MailboxMetadata> toProtoMailboxMap(Map<Integer, MailboxMetadata> mailBoxInfosMap) {
-    Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap = new HashMap<>();
-    for (Map.Entry<Integer, MailboxMetadata> entry : mailBoxInfosMap.entrySet()) {
-      mailboxMetadataMap.put(entry.getKey(), toProtoMailbox(entry.getValue()));
-    }
-    return mailboxMetadataMap;
+  private static Worker.MailboxMetadata toProtoMailboxMetadata(MailboxMetadata mailboxMetadata) {
+    List<String> virtualAddresses =
+        mailboxMetadata.getVirtualAddresses().stream().map(VirtualServerAddress::toString).collect(Collectors.toList());
+    return Worker.MailboxMetadata.newBuilder().addAllMailboxId(mailboxMetadata.getMailboxIds())
+        .addAllVirtualAddress(virtualAddresses).build();
   }
 
-  private static Worker.MailboxMetadata toProtoMailbox(MailboxMetadata mailboxMetadata) {
-    Worker.MailboxMetadata.Builder builder = Worker.MailboxMetadata.newBuilder();
-    for (int i = 0; i < mailboxMetadata.getMailBoxIdList().size(); i++) {
-      builder.addMailboxId(mailboxMetadata.getMailBoxId(i));
-      builder.addVirtualAddress(mailboxMetadata.getVirtualAddress(i).toString());
-    }
-    builder.putAllCustomProperty(mailboxMetadata.getCustomProperties());
-    return builder.build();
+  public static ByteString toProtoProperties(Map<String, String> properties) {
+    return Worker.Properties.newBuilder().putAllProperty(properties).build().toByteString();
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 33a955f709..3c03fa1539 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,19 +24,19 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 
 
 /**
- * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
+ * Context class for converting a {@link StagePlan} into
  * {@link PinotQuery} to execute on server.
  *
  * On leaf-stage server node, {@link PlanNode} are split into {@link PinotQuery} part and
  *     {@link org.apache.pinot.query.runtime.operator.OpChain} part.
  */
 public class ServerPlanRequestContext {
-  private final DistributedStagePlan _stagePlan;
+  private final StagePlan _stagePlan;
   private final QueryExecutor _leafQueryExecutor;
   private final ExecutorService _executorService;
   private final PipelineBreakerResult _pipelineBreakerResult;
@@ -45,7 +45,7 @@ public class ServerPlanRequestContext {
   private PlanNode _leafStageBoundaryNode;
   private List<ServerQueryRequest> _serverQueryRequests;
 
-  public ServerPlanRequestContext(DistributedStagePlan stagePlan, QueryExecutor leafQueryExecutor,
+  public ServerPlanRequestContext(StagePlan stagePlan, QueryExecutor leafQueryExecutor,
       ExecutorService executorService, PipelineBreakerResult pipelineBreakerResult) {
     _stagePlan = stagePlan;
     _leafQueryExecutor = leafQueryExecutor;
@@ -54,7 +54,7 @@ public class ServerPlanRequestContext {
     _pinotQuery = new PinotQuery();
   }
 
-  public DistributedStagePlan getStagePlan() {
+  public StagePlan getStagePlan() {
     return _stagePlan;
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 832b2a4666..4c504f71d6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -47,12 +47,11 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -64,8 +63,6 @@ import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQuer
 import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class ServerPlanRequestUtils {
@@ -73,7 +70,6 @@ public class ServerPlanRequestUtils {
   }
 
   private static final int DEFAULT_LEAF_NODE_LIMIT = Integer.MAX_VALUE;
-  private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class);
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
       ImmutableList.of(PredicateComparisonRewriter.class.getName(),
           NonAggregationGroupByToDistinctQueryRewriter.class.getName());
@@ -82,30 +78,29 @@ public class ServerPlanRequestUtils {
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
 
   /**
-   * main entry point for compiling leaf-stage {@link DistributedStagePlan}.
+   * main entry point for compiling leaf-stage {@link StagePlan}.
    *
    * @param executionContext the execution context used by the leaf-stage execution engine.
-   * @param distributedStagePlan the distribute stage plan on the leaf.
+   * @param stagePlan the distribute stage plan on the leaf.
    * @return an opChain that executes the leaf-stage, with the leaf-stage execution encapsulated within.
    */
-  public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
-      DistributedStagePlan distributedStagePlan, HelixManager helixManager, ServerMetrics serverMetrics,
-      QueryExecutor leafQueryExecutor, ExecutorService executorService) {
+  public static OpChain compileLeafStage(OpChainExecutionContext executionContext, StagePlan stagePlan,
+      HelixManager helixManager, ServerMetrics serverMetrics, QueryExecutor leafQueryExecutor,
+      ExecutorService executorService) {
     long queryArrivalTimeMs = System.currentTimeMillis();
-    ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor,
-        executorService, executionContext.getPipelineBreakerResult());
+    ServerPlanRequestContext serverContext = new ServerPlanRequestContext(stagePlan, leafQueryExecutor, executorService,
+        executionContext.getPipelineBreakerResult());
     // 1. compile the PinotQuery
     constructPinotQueryPlan(serverContext, executionContext.getOpChainMetadata());
     // 2. convert PinotQuery into InstanceRequest list (one for each physical table)
     List<InstanceRequest> instanceRequestList =
-        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan,
-            helixManager.getHelixPropertyStore());
+        constructServerQueryRequests(executionContext, serverContext, helixManager.getHelixPropertyStore());
     serverContext.setServerQueryRequests(instanceRequestList.stream()
         .map(instanceRequest -> new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true))
         .collect(Collectors.toList()));
     // compile the OpChain
     executionContext.setLeafStageContext(serverContext);
-    return PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
+    return PhysicalPlanVisitor.walkPlanNode(stagePlan.getRootNode(), executionContext);
   }
 
   /**
@@ -117,18 +112,13 @@ public class ServerPlanRequestUtils {
    */
   private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext,
       Map<String, String> requestMetadata) {
-    DistributedStagePlan stagePlan = serverContext.getStagePlan();
+    StagePlan stagePlan = serverContext.getStagePlan();
     PinotQuery pinotQuery = serverContext.getPinotQuery();
-    pinotQuery.setExplain(false);
     // attach leaf node limit it not set
     Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadata);
-    if (leafNodeLimit != null) {
-      pinotQuery.setLimit(leafNodeLimit);
-    } else {
-      pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
-    }
+    pinotQuery.setLimit(leafNodeLimit != null ? leafNodeLimit : DEFAULT_LEAF_NODE_LIMIT);
     // visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
-    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
+    ServerPlanRequestVisitor.walkStageNode(stagePlan.getRootNode(), serverContext);
   }
 
   /**
@@ -139,17 +129,16 @@ public class ServerPlanRequestUtils {
    * @return a list of server instance request to be run.
    */
   public static List<InstanceRequest> constructServerQueryRequests(OpChainExecutionContext executionContext,
-      ServerPlanRequestContext serverContext, DistributedStagePlan distributedStagePlan,
-      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
-    StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
-    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
-    String rawTableName = StageMetadata.getTableName(stageMetadata);
-    int stageId = distributedStagePlan.getStageId();
-    Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
-    List<InstanceRequest> requests = new ArrayList<>();
-    for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
-      String tableType = tableEntry.getKey();
-      List<String> segmentList = tableEntry.getValue();
+      ServerPlanRequestContext serverContext, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+    int stageId = executionContext.getStageId();
+    StageMetadata stageMetadata = executionContext.getStageMetadata();
+    String rawTableName = stageMetadata.getTableName();
+    Map<String, List<String>> tableSegmentsMap = executionContext.getWorkerMetadata().getTableSegmentsMap();
+    assert tableSegmentsMap != null;
+    List<InstanceRequest> requests = new ArrayList<>(tableSegmentsMap.size());
+    for (Map.Entry<String, List<String>> entry : tableSegmentsMap.entrySet()) {
+      String tableType = entry.getKey();
+      List<String> segments = entry.getValue();
       // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
       // network traffic. but there's chance to improve this:
       // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
@@ -158,15 +147,15 @@ public class ServerPlanRequestUtils {
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, segmentList));
+        requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
+            stageMetadata.getTimeBoundary(), TableType.OFFLINE, segments));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, segmentList));
+        requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
+            stageMetadata.getTimeBoundary(), TableType.REALTIME, segments));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + tableType);
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 405f619a9b..06df28f561 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.service.dispatch;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import io.grpc.Deadline;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +39,6 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -111,41 +111,43 @@ public class QueryDispatcher {
   void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
       throws Exception {
     Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+
+    // Serialize the stage plans in parallel
     List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+    Set<QueryServerInstance> serverInstances = new HashSet<>();
     // Ignore the reduce stage (stage 0)
     int numStages = stagePlans.size() - 1;
-    Set<QueryServerInstance> serverInstances = new HashSet<>();
-    // Serialize the stage plans in parallel
-    Plan.StageNode[] stageRootNodes = new Plan.StageNode[numStages];
-    //noinspection unchecked
-    List<Worker.WorkerMetadata>[] stageWorkerMetadataLists = new List[numStages];
-    CompletableFuture<?>[] stagePlanSerializationStubs = new CompletableFuture[2 * numStages];
+    List<CompletableFuture<StageInfo>> stageInfoFutures = new ArrayList<>(numStages);
     for (int i = 0; i < numStages; i++) {
       DispatchablePlanFragment stagePlan = stagePlans.get(i + 1);
       serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
-      int finalI = i;
-      stagePlanSerializationStubs[2 * i] = CompletableFuture.runAsync(() -> stageRootNodes[finalI] =
-              StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot()),
-          _executorService);
-      stagePlanSerializationStubs[2 * i + 1] = CompletableFuture.runAsync(
-          () -> stageWorkerMetadataLists[finalI] = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan),
-          _executorService);
+      stageInfoFutures.add(CompletableFuture.supplyAsync(() -> {
+        ByteString rootNode =
+            StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot())
+                .toByteString();
+        ByteString customProperty = QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+        return new StageInfo(rootNode, customProperty);
+      }, _executorService));
     }
+    List<StageInfo> stageInfos = new ArrayList<>(numStages);
     try {
-      CompletableFuture.allOf(stagePlanSerializationStubs)
-          .get(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+      for (CompletableFuture<StageInfo> future : stageInfoFutures) {
+        stageInfos.add(future.get(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
+      }
     } finally {
-      for (CompletableFuture<?> future : stagePlanSerializationStubs) {
+      for (CompletableFuture<?> future : stageInfoFutures) {
         if (!future.isDone()) {
           future.cancel(true);
         }
       }
     }
+
     Map<String, String> requestMetadata = new HashMap<>();
     requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
     requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
         Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
     requestMetadata.putAll(queryOptions);
+    ByteString protoRequestMetadata = QueryPlanSerDeUtils.toProtoProperties(requestMetadata);
 
     // Submit the query plan to all servers in parallel
     int numServers = serverInstances.size();
@@ -159,13 +161,23 @@ public class QueryDispatcher {
             DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
             List<Integer> workerIds = stagePlan.getServerInstanceToWorkerIdMap().get(serverInstance);
             if (workerIds != null) {
+              List<WorkerMetadata> stageWorkerMetadataList = stagePlan.getWorkerMetadataList();
+              List<WorkerMetadata> workerMetadataList = new ArrayList<>(workerIds.size());
+              for (int workerId : workerIds) {
+                workerMetadataList.add(stageWorkerMetadataList.get(workerId));
+              }
+              List<Worker.WorkerMetadata> protoWorkerMetadataList =
+                  QueryPlanSerDeUtils.toProtoWorkerMetadataList(workerMetadataList);
+              StageInfo stageInfo = stageInfos.get(i);
+              Worker.StageMetadata stageMetadata =
+                  Worker.StageMetadata.newBuilder().addAllWorkerMetadata(protoWorkerMetadataList)
+                      .setCustomProperty(stageInfo._customProperty).build();
               requestBuilder.addStagePlan(
-                  Worker.StagePlan.newBuilder().setStageId(stageId).setStageRoot(stageRootNodes[i]).setStageMetadata(
-                      QueryPlanSerDeUtils.toProtoStageMetadata(stageWorkerMetadataLists[i],
-                          stagePlan.getCustomProperties(), serverInstance, workerIds)).build());
+                  Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(stageInfo._rootNode)
+                      .setStageMetadata(stageMetadata).build());
             }
           }
-          requestBuilder.putAllMetadata(requestMetadata);
+          requestBuilder.setMetadata(protoRequestMetadata);
           getOrCreateDispatchClient(serverInstance).submit(requestBuilder.build(), serverInstance, deadline,
               dispatchCallbacks::offer);
         } catch (Throwable t) {
@@ -204,6 +216,16 @@ public class QueryDispatcher {
     }
   }
 
+  private static class StageInfo {
+    final ByteString _rootNode;
+    final ByteString _customProperty;
+
+    private StageInfo(ByteString rootNode, ByteString customProperty) {
+      _rootNode = rootNode;
+      _customProperty = customProperty;
+    }
+  }
+
   private void cancel(long requestId, DispatchableSubPlan dispatchableSubPlan) {
     List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
     int numStages = stagePlans.size();
@@ -233,21 +255,19 @@ public class QueryDispatcher {
       Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap,
       MailboxService mailboxService) {
     // NOTE: Reduce stage is always stage 0
-    DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(0);
-    PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment();
+    DispatchablePlanFragment dispatchableStagePlan = dispatchableSubPlan.getQueryStageList().get(0);
+    PlanFragment planFragment = dispatchableStagePlan.getPlanFragment();
     PlanNode rootNode = planFragment.getFragmentRoot();
     Preconditions.checkState(rootNode instanceof MailboxReceiveNode,
         "Expecting mailbox receive node as root of reduce stage, got: %s", rootNode.getClass().getSimpleName());
     MailboxReceiveNode receiveNode = (MailboxReceiveNode) rootNode;
-    List<WorkerMetadata> workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
+    List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
     Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s",
         workerMetadataList.size());
-    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(workerMetadataList)
-        .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
+    StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
     OpChainExecutionContext opChainExecutionContext =
         new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(),
-            workerMetadataList.get(0).getVirtualServerAddress(), System.currentTimeMillis() + timeoutMs, queryOptions,
-            stageMetadata, null);
+            System.currentTimeMillis() + timeoutMs, queryOptions, stageMetadata, workerMetadataList.get(0), null);
     MailboxReceiveOperator receiveOperator =
         new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(),
             receiveNode.getSenderStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index ecfa9b09f8..2e52c28a5a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -31,8 +31,10 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -95,31 +97,43 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   @Override
   public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
-    Map<String, String> requestMetadata = request.getMetadataMap();
+    Map<String, String> requestMetadata;
+    try {
+      requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while deserializing request metadata", e);
+      responseObserver.onNext(Worker.QueryResponse.newBuilder()
+          .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
+              QueryException.getTruncatedStackTrace(e)).build());
+      responseObserver.onCompleted();
+      return;
+    }
     long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
 
-    List<Worker.StagePlan> stagePlans = request.getStagePlanList();
-    int numStages = stagePlans.size();
+    List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
+    int numStages = protoStagePlans.size();
     CompletableFuture<?>[] stageSubmissionStubs = new CompletableFuture[numStages];
     for (int i = 0; i < numStages; i++) {
-      Worker.StagePlan stagePlan = stagePlans.get(i);
+      Worker.StagePlan protoStagePlan = protoStagePlans.get(i);
       stageSubmissionStubs[i] = CompletableFuture.runAsync(() -> {
-        List<DistributedStagePlan> workerPlans;
+        StagePlan stagePlan;
         try {
-          workerPlans = QueryPlanSerDeUtils.deserializeStagePlan(stagePlan);
+          stagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
         } catch (Exception e) {
           throw new RuntimeException(
-              String.format("Caught exception while deserializing stage plan for request: %d, stage id: %d", requestId,
-                  stagePlan.getStageId()), e);
+              String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", requestId,
+                  protoStagePlan.getStageId()), e);
         }
-        int numWorkers = workerPlans.size();
+        StageMetadata stageMetadata = stagePlan.getStageMetadata();
+        List<WorkerMetadata> workerMetadataList = stageMetadata.getWorkerMetadataList();
+        int numWorkers = workerMetadataList.size();
         CompletableFuture<?>[] workerSubmissionStubs = new CompletableFuture[numWorkers];
         for (int j = 0; j < numWorkers; j++) {
-          DistributedStagePlan workerPlan = workerPlans.get(j);
+          WorkerMetadata workerMetadata = workerMetadataList.get(j);
           workerSubmissionStubs[j] =
-              CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerPlan, requestMetadata),
+              CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadata),
                   _querySubmissionExecutorService);
         }
         try {
@@ -127,8 +141,8 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
               .get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           throw new RuntimeException(
-              String.format("Caught exception while submitting request: %d, stage id: %d", requestId,
-                  stagePlan.getStageId()), e);
+              String.format("Caught exception while submitting request: %d, stage: %d", requestId,
+                  protoStagePlan.getStageId()), e);
         } finally {
           for (CompletableFuture<?> future : workerSubmissionStubs) {
             if (!future.isDone()) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index b4b1dff3cc..1811218eda 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -27,8 +27,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.data.Schema;
@@ -109,9 +110,9 @@ public class QueryServerEnclosure {
     _queryRunner.shutDown();
   }
 
-  public CompletableFuture<Void> processQuery(DistributedStagePlan distributedStagePlan,
+  public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan,
       Map<String, String> requestMetadataMap) {
-    return CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadataMap),
+    return CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadataMap),
         _queryRunner.getExecutorService());
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 8e6b563ac4..3f20d33956 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
@@ -299,7 +300,8 @@ public class MailboxServiceTest {
     SendingMailbox sendingMailbox =
         _mailboxService1.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
     ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId);
-    receivingMailbox.registeredReader(() -> { });
+    receivingMailbox.registeredReader(() -> {
+    });
 
     // send a block
     sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
@@ -591,15 +593,16 @@ public class MailboxServiceTest {
     SendingMailbox sendingMailbox =
         _mailboxService2.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
     ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId);
-    receivingMailbox.registeredReader(() -> { });
+    receivingMailbox.registeredReader(() -> {
+    });
 
     // send a block
     sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
     // receiving-side early terminates after pulling the first block
     TestUtils.waitForCondition(aVoid -> {
-          TransferableBlock block = receivingMailbox.poll();
-          return block != null && block.getNumRows() == 1;
-        }, 1000L, "Failed to deliver mails");
+      TransferableBlock block = receivingMailbox.poll();
+      return block != null && block.getNumRows() == 1;
+    }, 1000L, "Failed to deliver mails");
     receivingMailbox.earlyTerminate();
 
     // send another block b/c it doesn't guarantee the next block must be EOS
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index e79f46e671..c34c858e76 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -18,17 +18,20 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -67,9 +70,10 @@ public class OpChainSchedulerServiceTest {
   }
 
   private OpChain getChain(MultiStageOperator operator) {
-    VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(null, 123L, 1, address, Long.MAX_VALUE, Collections.emptyMap(), null, null);
+    WorkerMetadata workerMetadata =
+        new WorkerMetadata(new VirtualServerAddress("localhost", 123, 0), ImmutableMap.of(), ImmutableMap.of());
+    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, Long.MAX_VALUE, ImmutableMap.of(),
+        new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
     return new OpChain(context, operator);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 9f5b8dfffe..1a2949b142 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -26,9 +26,9 @@ import java.util.stream.Stream;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
@@ -39,6 +39,7 @@ import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -51,12 +52,14 @@ import static org.testng.Assert.assertTrue;
 
 
 public class MailboxReceiveOperatorTest {
-  private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
   private static final DataSchema DATA_SCHEMA =
       new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
   private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
   private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
 
+  private StageMetadata _stageMetadataBoth;
+  private StageMetadata _stageMetadata1;
+
   private AutoCloseable _mocks;
   @Mock
   private MailboxService _mailboxService;
@@ -64,40 +67,32 @@ public class MailboxReceiveOperatorTest {
   private ReceivingMailbox _mailbox1;
   @Mock
   private ReceivingMailbox _mailbox2;
-  private StageMetadata _stageMetadataBoth;
-  private StageMetadata _stageMetadata1;
 
-  @BeforeMethod
+  @BeforeClass
   public void setUp() {
+    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
+    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
+    _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
+        new MailboxMetadata(
+            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
+            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
+    _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
+        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
+        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
+        ImmutableMap.of())), ImmutableMap.of());
+  }
+
+  @BeforeMethod
+  public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_mailboxService.getHostname()).thenReturn("localhost");
     when(_mailboxService.getPort()).thenReturn(123);
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
-            .addMailBoxInfoMap(0, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                ImmutableList.of(server1, server2), ImmutableMap.of()))
-            .addMailBoxInfoMap(1, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                ImmutableList.of(server1, server2), ImmutableMap.of()))
-            .build()).collect(Collectors.toList())).build();
-    _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
-            .addMailBoxInfoMap(0, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                ImmutableList.of(server1), ImmutableMap.of()))
-            .addMailBoxInfoMap(1, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                ImmutableList.of(server1), ImmutableMap.of()))
-            .build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     _mocks.close();
   }
@@ -105,7 +100,7 @@ public class MailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
   }
@@ -116,8 +111,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 1000L,
-            _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, System.currentTimeMillis() + 1000L, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock block = receiveOp.nextBlock();
@@ -132,7 +126,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
@@ -146,7 +140,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
@@ -163,7 +157,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
@@ -181,7 +175,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -204,7 +198,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       // Receive first block from server1
@@ -229,7 +223,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       TransferableBlock block = receiveOp.nextBlock();
@@ -251,7 +245,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       // Receive first block from server1
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 7a49dcf16a..86b2ac0000 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
@@ -50,7 +51,6 @@ public class MailboxSendOperatorTest {
   private static final int SENDER_STAGE_ID = 1;
 
   private AutoCloseable _mocks;
-
   @Mock
   private VirtualServerAddress _server;
   @Mock
@@ -61,8 +61,7 @@ public class MailboxSendOperatorTest {
   private BlockExchange _exchange;
 
   @BeforeMethod
-  public void setUp()
-      throws Exception {
+  public void setUpMethod() {
     _mocks = openMocks(this);
     when(_server.hostname()).thenReturn("mock");
     when(_server.port()).thenReturn(0);
@@ -70,7 +69,7 @@ public class MailboxSendOperatorTest {
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     _mocks.close();
   }
@@ -199,11 +198,11 @@ public class MailboxSendOperatorTest {
   }
 
   private MailboxSendOperator getMailboxSendOperator() {
-    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
-        Collections.singletonList(new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
+    WorkerMetadata workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(), ImmutableMap.of());
+    StageMetadata stageMetadata = new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
-            Collections.emptyMap(), stageMetadata, null);
+        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, Long.MAX_VALUE, ImmutableMap.of(),
+            stageMetadata, workerMetadata, null);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index fb937e6238..a74dec4e6f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -22,15 +22,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datatable.DataTable;
@@ -77,8 +74,17 @@ public class OpChainTest {
   private static int _numOperatorsInitialized = 0;
 
   private final List<TransferableBlock> _blockList = new ArrayList<>();
-  private final ExecutorService _executorService = Executors.newCachedThreadPool();
+  private final ExecutorService _executor = Executors.newCachedThreadPool();
   private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>();
+  private final VirtualServerAddress _serverAddress = new VirtualServerAddress("localhost", 123, 0);
+  private final WorkerMetadata _workerMetadata = new WorkerMetadata(_serverAddress, ImmutableMap.of(0,
+      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+          ImmutableList.of(_serverAddress)), 1,
+      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+          ImmutableList.of(_serverAddress)), 2,
+      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+          ImmutableList.of(_serverAddress))), ImmutableMap.of());
+  private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
 
   private AutoCloseable _mocks;
   @Mock
@@ -94,22 +100,9 @@ public class OpChainTest {
   @Mock
   private BlockExchange _exchange;
 
-  private VirtualServerAddress _serverAddress;
-  private StageMetadata _receivingStageMetadata;
-
   @BeforeMethod
   public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
-    _serverAddress = new VirtualServerAddress("localhost", 123, 0);
-    _receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_serverAddress).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0,
-            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
-                ImmutableMap.of())).addMailBoxInfoMap(1,
-            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
-                ImmutableMap.of())).addMailBoxInfoMap(2,
-            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
-                ImmutableMap.of())).build()).collect(Collectors.toList())).build();
-
     when(_mailboxService1.getReceivingMailbox(any())).thenReturn(_mailbox1);
     when(_mailboxService2.getReceivingMailbox(any())).thenReturn(_mailbox2);
 
@@ -139,7 +132,7 @@ public class OpChainTest {
 
   @AfterClass
   public void tearDown() {
-    _executorService.shutdown();
+    _executor.shutdown();
   }
 
   @Test
@@ -208,9 +201,8 @@ public class OpChainTest {
 
     int receivedStageId = 2;
     int senderStageId = 1;
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE,
-            Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null);
+    OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE,
+        ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -223,8 +215,8 @@ public class OpChainTest {
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE,
-            Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null);
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE,
+            ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
 
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -249,8 +241,8 @@ public class OpChainTest {
     int receivedStageId = 2;
     int senderStageId = 1;
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE,
-            Collections.emptyMap(), _receivingStageMetadata, null);
+        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, ImmutableMap.of(),
+            _stageMetadata, _workerMetadata, null);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -261,8 +253,8 @@ public class OpChainTest {
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE,
-            Collections.emptyMap(), _receivingStageMetadata, null);
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, ImmutableMap.of(),
+            _stageMetadata, _workerMetadata, null);
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
 
@@ -296,19 +288,19 @@ public class OpChainTest {
     }
 
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
-    List<BaseResultsBlock> dataBlocks = Collections.singletonList(
+    List<BaseResultsBlock> dataBlocks = ImmutableList.of(
         new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2}), queryContext));
     InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock());
     QueryExecutor queryExecutor = mockQueryExecutor(dataBlocks, metadataBlock);
     LeafStageTransferableBlockOperator leafOp =
-        new LeafStageTransferableBlockOperator(context, Collections.singletonList(mock(ServerQueryRequest.class)),
-            upStreamSchema, queryExecutor, _executorService);
+        new LeafStageTransferableBlockOperator(context, ImmutableList.of(mock(ServerQueryRequest.class)),
+            upStreamSchema, queryExecutor, _executor);
     _leafOpRef.set(leafOp);
 
     //Transform operator
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
     TransformOperator transformOp =
-        new TransformOperator(context, leafOp, upStreamSchema, Collections.singletonList(ref0), upStreamSchema);
+        new TransformOperator(context, leafOp, upStreamSchema, ImmutableList.of(ref0), upStreamSchema);
 
     //Filter operator
     RexExpression booleanLiteral = new RexExpression.Literal(ColumnDataType.BOOLEAN, 1);
@@ -377,7 +369,7 @@ public class OpChainTest {
 
     @Override
     public List<MultiStageOperator> getChildOperators() {
-      return Collections.singletonList(_upstream);
+      return ImmutableList.of(_upstream);
     }
 
     @Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 5f139e5545..3c132269c7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,15 +18,16 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -38,8 +39,8 @@ import org.apache.pinot.spi.utils.CommonConstants;
 public class OperatorTestUtil {
   // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
   private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS =
-      Arrays.asList(Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
-          Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
+      ImmutableList.of(ImmutableList.of(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
+          ImmutableList.of(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
   private static final MockDataBlockOperatorFactory MOCK_OPERATOR_FACTORY;
 
   public static final DataSchema SIMPLE_KV_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
@@ -75,21 +76,24 @@ public class OperatorTestUtil {
     return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
   }
 
-  public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService,
-      VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) {
-    return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, Collections.emptyMap(),
-        stageMetadata, null);
+  public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs,
+      StageMetadata stageMetadata) {
+    return new OpChainExecutionContext(mailboxService, 0, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
+        stageMetadata.getWorkerMetadataList().get(0), null);
   }
 
   public static OpChainExecutionContext getDefaultContext() {
-    VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE,
-        Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), null, null);
+    return getDefaultContext(ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"));
   }
 
   public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
-    VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Collections.emptyMap(), null,
-        null);
+    return getDefaultContext(ImmutableMap.of());
+  }
+
+  private static OpChainExecutionContext getDefaultContext(Map<String, String> opChainMetadata) {
+    WorkerMetadata workerMetadata =
+        new WorkerMetadata(new VirtualServerAddress("mock", 80, 0), ImmutableMap.of(), ImmutableMap.of());
+    return new OpChainExecutionContext(null, 1, 2, Long.MAX_VALUE, opChainMetadata,
+        new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 4de9e3d4c6..1e71018215 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -30,10 +30,10 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
@@ -44,6 +44,7 @@ import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -56,7 +57,6 @@ import static org.testng.Assert.assertTrue;
 
 
 public class SortedMailboxReceiveOperatorTest {
-  private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
   private static final DataSchema DATA_SCHEMA =
       new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
   private static final List<RexExpression> COLLATION_KEYS = Collections.singletonList(new RexExpression.InputRef(0));
@@ -65,6 +65,9 @@ public class SortedMailboxReceiveOperatorTest {
   private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
   private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
 
+  private StageMetadata _stageMetadataBoth;
+  private StageMetadata _stageMetadata1;
+
   private AutoCloseable _mocks;
   @Mock
   private MailboxService _mailboxService;
@@ -73,40 +76,31 @@ public class SortedMailboxReceiveOperatorTest {
   @Mock
   private ReceivingMailbox _mailbox2;
 
-  private StageMetadata _stageMetadataBoth;
-  private StageMetadata _stageMetadata1;
+  @BeforeClass
+  public void setUp() {
+    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
+    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
+    _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
+        new MailboxMetadata(
+            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
+            ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
+    _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
+        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
+        new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
+        ImmutableMap.of())), ImmutableMap.of());
+  }
 
   @BeforeMethod
-  public void setUp() {
+  public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_mailboxService.getHostname()).thenReturn("localhost");
     when(_mailboxService.getPort()).thenReturn(123);
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
-            .addMailBoxInfoMap(0, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                ImmutableList.of(server1, server2), ImmutableMap.of()))
-            .addMailBoxInfoMap(1, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                    org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                ImmutableList.of(server1, server2), ImmutableMap.of()))
-            .build()).collect(Collectors.toList())).build();
-    _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
-        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
-            .addMailBoxInfoMap(0, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                ImmutableList.of(server1), ImmutableMap.of()))
-            .addMailBoxInfoMap(1, new MailboxMetadata(
-                ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                ImmutableList.of(server1), ImmutableMap.of()))
-            .build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     _mocks.close();
   }
@@ -114,7 +108,7 @@ public class SortedMailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -124,7 +118,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowOnEmptyCollationKey() {
     when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
         Collections.emptyList(), Collections.emptyList(), false, 1);
@@ -136,8 +130,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 1000L,
-            _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, System.currentTimeMillis() + 1000L, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -153,7 +146,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -168,7 +161,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -186,7 +179,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -205,7 +198,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -230,7 +223,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -255,7 +248,7 @@ public class SortedMailboxReceiveOperatorTest {
         OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -286,7 +279,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+        OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
         false, 1)) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index e7fa0e7db2..94d5e2b873 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -20,22 +20,18 @@ package org.apache.pinot.query.runtime.plan.pipeline;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.routing.MailboxMetadata;
@@ -46,8 +42,8 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -61,12 +57,22 @@ import static org.mockito.Mockito.when;
 
 
 public class PipelineBreakerExecutorTest {
-  private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
   private static final DataSchema DATA_SCHEMA =
       new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
   private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
   private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 2, 0, 0, 0);
 
+  private final VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
+  private final ExecutorService _executor = Executors.newCachedThreadPool();
+  private final OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
+  private final WorkerMetadata _workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(0, new MailboxMetadata(
+          ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+          ImmutableList.of(_server, _server)), 1,
+      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(_server)), 2,
+      new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server))),
+      ImmutableMap.of());
+  private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+
   private AutoCloseable _mocks;
   @Mock
   private MailboxService _mailboxService;
@@ -75,31 +81,8 @@ public class PipelineBreakerExecutorTest {
   @Mock
   private ReceivingMailbox _mailbox2;
 
-  private VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
-  private ExecutorService _executor = Executors.newCachedThreadPool(
-      new NamedThreadFactory("worker_on_asd_" + getClass().getSimpleName()));
-  private OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
-  private StageMetadata _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_server).map(
-      s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
-          .addMailBoxInfoMap(0, new MailboxMetadata(
-              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                  org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
-              ImmutableList.of(_server), ImmutableMap.of()))
-          .addMailBoxInfoMap(1, new MailboxMetadata(
-              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-              ImmutableList.of(_server), ImmutableMap.of()))
-          .addMailBoxInfoMap(2, new MailboxMetadata(
-              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
-              ImmutableList.of(_server), ImmutableMap.of()))
-          .build()).collect(Collectors.toList())).build();
-
-  @AfterClass
-  public void tearDownClass() {
-    ExecutorServiceUtils.close(_executor);
-  }
-
   @BeforeMethod
-  public void setUp() {
+  public void setUpMethod() {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_mailboxService.getHostname()).thenReturn("localhost");
     when(_mailboxService.getPort()).thenReturn(123);
@@ -109,18 +92,22 @@ public class PipelineBreakerExecutorTest {
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     _mocks.close();
   }
 
+  @AfterClass
+  public void tearDown() {
+    ExecutorServiceUtils.close(_executor);
+  }
+
   @Test
   public void shouldReturnBlocksUponNormalOperation() {
     MailboxReceiveNode mailboxReceiveNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -131,8 +118,8 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, Long.MAX_VALUE);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
     // should have single PB result, receive 2 data blocks, EOS block shouldn't be included
@@ -155,11 +142,10 @@ public class PipelineBreakerExecutorTest {
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
     JoinNode joinNode =
-        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(mailboxReceiveNode2);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -172,8 +158,8 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 2, _server)));
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, Long.MAX_VALUE);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
     // should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included
@@ -195,13 +181,12 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, incorrectlyConfiguredMailboxNode, _stageMetadata);
 
     // when
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, Long.MAX_VALUE);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
     // should return empty block list
@@ -219,8 +204,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode mailboxReceiveNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -231,8 +215,8 @@ public class PipelineBreakerExecutorTest {
     });
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, System.currentTimeMillis() + 100);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, System.currentTimeMillis() + 100);
 
     // then
     // should contain only failure error blocks
@@ -253,11 +237,10 @@ public class PipelineBreakerExecutorTest {
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
     JoinNode joinNode =
-        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -270,8 +253,8 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, Long.MAX_VALUE);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
     // should pass when one PB returns result, the other returns empty.
@@ -292,11 +275,10 @@ public class PipelineBreakerExecutorTest {
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
     JoinNode joinNode =
-        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
-    DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+    StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
 
     // when
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -309,8 +291,8 @@ public class PipelineBreakerExecutorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
-            Collections.emptyMap(), 0, Long.MAX_VALUE);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+            ImmutableMap.of(), 0, Long.MAX_VALUE);
 
     // then
     // should fail even if one of the 2 PB doesn't contain error block from sender.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
deleted file mode 100644
index 9ca24ebf48..0000000000
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.query.runtime.plan.serde;
-
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class QueryPlanSerDeUtilsTest {
-
-  @Test
-  public void shouldSerializeServer() {
-    // Given:
-    VirtualServerAddress server = Mockito.mock(VirtualServerAddress.class);
-    Mockito.when(server.workerId()).thenReturn(1);
-    Mockito.when(server.hostname()).thenReturn("Server_192.987.1.123");
-    Mockito.when(server.port()).thenReturn(80);
-
-    // When:
-    String serialized = QueryPlanSerDeUtils.addressToProto(server);
-
-    // Then:
-    Assert.assertEquals(serialized, "1@Server_192.987.1.123:80");
-  }
-
-  @Test
-  public void shouldDeserializeServerString() {
-    // Given:
-    String serverString = "1@Server_192.987.1.123:80";
-
-    // When:
-    VirtualServerAddress server = QueryPlanSerDeUtils.protoToAddress(serverString);
-
-    // Then:
-    Assert.assertEquals(server.workerId(), 1);
-    Assert.assertEquals(server.hostname(), "Server_192.987.1.123");
-    Assert.assertEquals(server.port(), 80);
-  }
-}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 33b68f807e..f1315aa1bf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -58,9 +58,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -154,29 +154,24 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
 
   protected List<CompletableFuture<?>> processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
       int stageId, Map<String, String> requestMetadataMap) {
-    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
-        dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
+    DispatchablePlanFragment dispatchableStagePlan = dispatchableSubPlan.getQueryStageList().get(stageId);
+    List<WorkerMetadata> stageWorkerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
     List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
-    for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) {
-      QueryServerInstance server = entry.getKey();
-      for (int workerId : entry.getValue()) {
-        DistributedStagePlan distributedStagePlan =
-            constructDistributedStagePlan(dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId));
-        submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap));
+    for (Map.Entry<QueryServerInstance, List<Integer>> entry : dispatchableStagePlan.getServerInstanceToWorkerIdMap()
+        .entrySet()) {
+      QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
+      List<WorkerMetadata> workerMetadataList =
+          entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
+      StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+      StagePlan stagePlan =
+          new StagePlan(stageId, dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
+      for (WorkerMetadata workerMetadata : workerMetadataList) {
+        submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap));
       }
     }
     return submissionStubs;
   }
 
-  protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
-      int stageId, VirtualServerAddress serverAddress) {
-    return new DistributedStagePlan(stageId, serverAddress,
-        dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
-        new StageMetadata.Builder().setWorkerMetadataList(
-                dispatchableSubPlan.getQueryStageList().get(stageId).getWorkerMetadataList())
-            .addCustomProperties(dispatchableSubPlan.getQueryStageList().get(stageId).getCustomProperties()).build());
-  }
-
   protected List<Object[]> queryH2(String sql)
       throws Exception {
     int firstSemi = sql.indexOf(';');
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 4e5a003427..679f46c60c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.service.server;
 
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import io.grpc.Deadline;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -26,6 +27,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
@@ -49,12 +51,15 @@ import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.util.TestUtils;
-import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertTrue;
+
 
 public class QueryServerTest extends QueryTestSet {
   private static final Random RANDOM_REQUEST_ID_GEN = new Random();
@@ -70,10 +75,9 @@ public class QueryServerTest extends QueryTestSet {
   @BeforeClass
   public void setUp()
       throws Exception {
-
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
-      QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+      QueryRunner queryRunner = mock(QueryRunner.class);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
@@ -96,85 +100,81 @@ public class QueryServerTest extends QueryTestSet {
   }
 
   @Test
-  public void testException() {
-    DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery("SELECT * FROM a");
+  public void testException()
+      throws Exception {
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a");
     // only get one worker request out.
-    Worker.QueryRequest queryRequest = getQueryRequest(dispatchableSubPlan, 1);
-    QueryRunner mockRunner =
-        _queryRunnerMap.get(Integer.parseInt(queryRequest.getMetadataOrThrow(KEY_OF_SERVER_INSTANCE_PORT)));
-    Mockito.doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(Mockito.any(), Mockito.anyMap());
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+    Map<String, String> requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+    QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+    doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(any(), any(), any());
     // submit the request for testing.
-    Worker.QueryResponse resp = submitRequest(queryRequest);
+    Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
     // reset the mock runner before assert.
-    Mockito.reset(mockRunner);
+    reset(mockRunner);
     // should contain error message pattern
     String errorMessage = resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR);
-    Assert.assertTrue(errorMessage.contains("foo"));
+    assertTrue(errorMessage.contains("foo"));
   }
 
   @Test(dataProvider = "testSql")
   public void testWorkerAcceptsWorkerRequestCorrect(String sql)
       throws Exception {
-    DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
-
-    for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
-      if (stageId > 0) { // we do not test reduce stage.
-        // only get one worker request out.
-        Worker.QueryRequest queryRequest = getQueryRequest(dispatchableSubPlan, stageId);
-
-        // submit the request for testing.
-        Worker.QueryResponse resp = submitRequest(queryRequest);
-        Assert.assertNotNull(resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery(sql);
+    List<DispatchablePlanFragment> stagePlans = queryPlan.getQueryStageList();
+    int numStages = stagePlans.size();
+    // Ignore reduce stage (stage 0)
+    for (int stageId = 1; stageId < numStages; stageId++) {
+      // only get one worker request out.
+      Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId);
+      Map<String, String> requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
 
-        DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
+      // submit the request for testing.
+      Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+      assertTrue(resp.getMetadataMap().containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
 
-        StageMetadata stageMetadata =
-            new StageMetadata.Builder().setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList())
-                .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
+      DispatchablePlanFragment dispatchableStagePlan = stagePlans.get(stageId);
+      List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
+      StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
 
-        // ensure mock query runner received correctly deserialized payload.
-        QueryRunner mockRunner =
-            _queryRunnerMap.get(Integer.parseInt(queryRequest.getMetadataOrThrow(KEY_OF_SERVER_INSTANCE_PORT)));
-        String requestIdStr = queryRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID);
+      // ensure mock query runner received correctly deserialized payload.
+      QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+      String requestId = requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID);
 
-        // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
-        int finalStageId = stageId;
-        TestUtils.waitForCondition(aVoid -> {
-          try {
-            Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
-              PlanNode planNode =
-                  dispatchableSubPlan.getQueryStageList().get(finalStageId).getPlanFragment().getFragmentRoot();
-              return isStageNodesEqual(planNode, distributedStagePlan.getStageRoot()) && isStageMetadataEqual(
-                  stageMetadata, distributedStagePlan.getStageMetadata());
-            }), Mockito.argThat(requestMetadataMap -> requestIdStr.equals(
-                requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))));
-            return true;
-          } catch (Throwable t) {
-            return false;
-          }
-        }, 10000L, "Error verifying mock QueryRunner intercepted query payload!");
+      // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          verify(mockRunner, times(workerMetadataList.size())).processQuery(any(), argThat(stagePlan -> {
+            PlanNode planNode = dispatchableStagePlan.getPlanFragment().getFragmentRoot();
+            return isStageNodesEqual(planNode, stagePlan.getRootNode()) && isStageMetadataEqual(stageMetadata,
+                stagePlan.getStageMetadata());
+          }), argThat(requestMetadataMap -> requestId.equals(
+              requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))));
+          return true;
+        } catch (Throwable t) {
+          return false;
+        }
+      }, 10000L, "Error verifying mock QueryRunner intercepted query payload!");
 
-        // reset the mock runner.
-        Mockito.reset(mockRunner);
-      }
+      // reset the mock runner.
+      reset(mockRunner);
     }
   }
 
   private boolean isStageMetadataEqual(StageMetadata expected, StageMetadata actual) {
-    if (!EqualityUtils.isEqual(StageMetadata.getTableName(expected), StageMetadata.getTableName(actual))) {
-      return false;
-    }
-    TimeBoundaryInfo expectedTimeBoundaryInfo = StageMetadata.getTimeBoundary(expected);
-    TimeBoundaryInfo actualTimeBoundaryInfo = StageMetadata.getTimeBoundary(actual);
-    if (expectedTimeBoundaryInfo == null && actualTimeBoundaryInfo != null
-        || expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo == null) {
+    if (!Objects.equals(expected.getTableName(), actual.getTableName())) {
       return false;
     }
-    if (expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo != null && (
-        !EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeColumn(), actualTimeBoundaryInfo.getTimeColumn())
-            || !EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeValue(),
-            actualTimeBoundaryInfo.getTimeValue()))) {
-      return false;
+    TimeBoundaryInfo expectedTimeBoundaryInfo = expected.getTimeBoundary();
+    TimeBoundaryInfo actualTimeBoundaryInfo = actual.getTimeBoundary();
+    if (expectedTimeBoundaryInfo != null || actualTimeBoundaryInfo != null) {
+      if (expectedTimeBoundaryInfo == null || actualTimeBoundaryInfo == null) {
+        return false;
+      }
+      if (!expectedTimeBoundaryInfo.getTimeColumn().equals(actualTimeBoundaryInfo.getTimeColumn())
+          || !expectedTimeBoundaryInfo.getTimeValue().equals(actualTimeBoundaryInfo.getTimeValue())) {
+        return false;
+      }
     }
     List<WorkerMetadata> expectedWorkerMetadataList = expected.getWorkerMetadataList();
     List<WorkerMetadata> actualWorkerMetadataList = actual.getWorkerMetadataList();
@@ -190,13 +190,8 @@ public class QueryServerTest extends QueryTestSet {
   }
 
   private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) {
-    if (!expected.getVirtualServerAddress().hostname().equals(actual.getVirtualServerAddress().hostname())
-        || expected.getVirtualServerAddress().port() != actual.getVirtualServerAddress().port()
-        || expected.getVirtualServerAddress().workerId() != actual.getVirtualServerAddress().workerId()) {
-      return false;
-    }
-    return EqualityUtils.isEqual(WorkerMetadata.getTableSegmentsMap(expected),
-        WorkerMetadata.getTableSegmentsMap(actual));
+    return expected.getVirtualAddress().equals(actual.getVirtualAddress()) && EqualityUtils.isEqual(
+        expected.getTableSegmentsMap(), actual.getTableSegmentsMap());
   }
 
   private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
@@ -216,11 +211,10 @@ public class QueryServerTest extends QueryTestSet {
     return true;
   }
 
-  private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest) {
-    String host = queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_HOST);
-    int port = Integer.parseInt(queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_PORT));
-    long timeoutMs =
-        Long.parseLong(queryRequest.getMetadataMap().get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+  private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest, Map<String, String> requestMetadata) {
+    String host = requestMetadata.get(KEY_OF_SERVER_INSTANCE_HOST);
+    int port = Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT));
+    long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
     PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = PinotQueryWorkerGrpc.newBlockingStub(channel);
     Worker.QueryResponse resp =
@@ -229,32 +223,35 @@ public class QueryServerTest extends QueryTestSet {
     return resp;
   }
 
-  private Worker.QueryRequest getQueryRequest(DispatchableSubPlan dispatchableSubPlan, int stageId) {
-    DispatchablePlanFragment planFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
-    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = planFragment.getServerInstanceToWorkerIdMap();
+  private Worker.QueryRequest getQueryRequest(DispatchableSubPlan queryPlan, int stageId) {
+    DispatchablePlanFragment stagePlan = queryPlan.getQueryStageList().get(stageId);
+    Plan.StageNode rootNode =
+        StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+    List<Worker.WorkerMetadata> workerMetadataList =
+        QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan.getWorkerMetadataList());
+    ByteString customProperty = QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+
     // this particular test set requires the request to have a single QueryServerInstance to dispatch to
     // as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest)
-    Map.Entry<QueryServerInstance, List<Integer>> entry = serverInstanceToWorkerIdMap.entrySet().iterator().next();
-    QueryServerInstance serverInstance = entry.getKey();
-    List<Integer> workerIds = entry.getValue();
-    Plan.StageNode stageRoot =
-        StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) planFragment.getPlanFragment().getFragmentRoot());
-    List<Worker.WorkerMetadata> protoWorkerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(planFragment);
+    QueryServerInstance serverInstance = stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
     Worker.StageMetadata stageMetadata =
-        QueryPlanSerDeUtils.toProtoStageMetadata(protoWorkerMetadataList, planFragment.getCustomProperties(),
-            serverInstance, workerIds);
-    Worker.StagePlan stagePlan =
-        Worker.StagePlan.newBuilder().setStageId(stageId).setStageRoot(stageRoot).setStageMetadata(stageMetadata)
+        Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList).setCustomProperty(customProperty)
             .build();
+    Worker.StagePlan protoStagePlan =
+        Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(rootNode.toByteString())
+            .setStageMetadata(stageMetadata).build();
+
+    Map<String, String> requestMetadata = new HashMap<>();
+    // the default configurations that must exist.
+    requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
+        String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+    requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+        String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+    // extra configurations we want to test also parsed out correctly.
+    requestMetadata.put(KEY_OF_SERVER_INSTANCE_HOST, serverInstance.getHostname());
+    requestMetadata.put(KEY_OF_SERVER_INSTANCE_PORT, Integer.toString(serverInstance.getQueryServicePort()));
 
-    return Worker.QueryRequest.newBuilder().addStagePlan(stagePlan)
-        // the default configurations that must exist.
-        .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
-            String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
-        .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-            String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS))
-        // extra configurations we want to test also parsed out correctly.
-        .putMetadata(KEY_OF_SERVER_INSTANCE_HOST, serverInstance.getHostname())
-        .putMetadata(KEY_OF_SERVER_INSTANCE_PORT, String.valueOf(serverInstance.getQueryServicePort())).build();
+    return Worker.QueryRequest.newBuilder().addStagePlan(protoStagePlan)
+        .setMetadata(QueryPlanSerDeUtils.toProtoProperties(requestMetadata)).build();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org