You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/06 18:42:52 UTC

[pinot] 03/03: add pinot-query-runtime (#8412)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 005d5b9965d635be503569edce492265a6676a09
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Apr 6 01:03:53 2022 -0700

    add pinot-query-runtime (#8412)
    
    * add pinot-query-runtime
    
    - inter-stage exchange interface
    - worker executor interface
    - dispatcher and dispatch-receiving server interface
    - row-based runtime operator (incomplete)
    
    several component that's need in order to access current pinot-server
    functionality
    - converter between stage plan into existing PinotQuery/QueryContext
    - after-execution exchange sender connected to InstanceResponseOperator
    
    * fix jdk8 compile
    
    * documentation for mailbox module
    
    * document the runtime
    
    * change stageId to Integer
    
    * change jobId to also long type to align with current pinot requestId
    
    * change the serializable format of dispatchable QueryPlan
    
    Work Items Done
    1. split worker.proto into the service proto and the message proto
    2. created StageRoot proto object that can encode the input StageNodes as serialized bytes.
    3. modify ser/deser modules in query-runtime
    4.remove unnecessary plan node operations in stage plan
    
    Work Items TODO
    1. make a proto for StageNode, should include all possible StageNode Fields.
    2. make a serializer and deserializer (real one to replace the current
       java.io.Serializable)
    
    * remove plan.proto
    
    will add it back once the serializable StageNode format is determined
    
    * fix serde
    
    * address diff comments
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 pinot-common/src/main/proto/mailbox.proto          |  55 ++++
 pinot-common/src/main/proto/worker.proto           |  74 ++++++
 pinot-distribution/pom.xml                         |   8 +
 .../{nodes/CalcNode.java => PlannerUtils.java}     |  26 +-
 .../org/apache/pinot/query/planner/QueryPlan.java  |  10 +-
 .../pinot/query/planner/RelToStageConverter.java   |  45 +++-
 .../apache/pinot/query/planner/StageMetadata.java  |   2 +-
 .../apache/pinot/query/planner/StagePlanner.java   |  24 +-
 .../query/planner/nodes/AbstractStageNode.java     |   6 +-
 .../apache/pinot/query/planner/nodes/CalcNode.java |  13 +-
 .../apache/pinot/query/planner/nodes/JoinNode.java |  77 ++----
 .../query/planner/nodes/MailboxReceiveNode.java    |  19 +-
 .../pinot/query/planner/nodes/MailboxSendNode.java |  22 +-
 .../pinot/query/planner/nodes/StageNode.java       |   2 +-
 .../pinot/query/planner/nodes/TableScanNode.java   |  26 +-
 .../partitioning/FieldSelectionKeySelector.java    |   4 +-
 .../query/planner/partitioning/KeySelector.java    |   5 +-
 .../apache/pinot/query/routing/WorkerManager.java  |   5 +-
 .../apache/pinot/query/QueryEnvironmentTest.java   |   5 +-
 .../pinot/query/QueryEnvironmentTestUtils.java     |   8 +-
 pinot-query-runtime/pom.xml                        |  83 ++++++
 .../pinot/query/mailbox/GrpcMailboxService.java    | 101 ++++++++
 .../pinot/query/mailbox/GrpcReceivingMailbox.java  |  88 +++++++
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  74 ++++++
 .../pinot/query/mailbox/MailboxIdentifier.java     |  42 +++-
 .../apache/pinot/query/mailbox/MailboxService.java |  70 ++++++
 .../pinot/query/mailbox/ReceivingMailbox.java      |  37 ++-
 .../apache/pinot/query/mailbox/SendingMailbox.java |  30 ++-
 .../query/mailbox/StringMailboxIdentifier.java     | 100 ++++++++
 .../java/org/apache/pinot/query/mailbox/Utils.java |  32 +--
 .../query/mailbox/channel/ChannelManager.java      |  62 +++++
 .../query/mailbox/channel/GrpcMailboxServer.java   |  72 ++++++
 .../channel/MailboxContentStreamObserver.java      | 114 +++++++++
 .../channel/MailboxStatusStreamObserver.java       |  88 +++++++
 .../apache/pinot/query/runtime/QueryRunner.java    | 118 +++++++++
 .../pinot/query/runtime/blocks/DataTableBlock.java |  73 ++++++
 .../query/runtime/blocks/DataTableBlockUtils.java  |  71 ++++++
 .../runtime/executor/WorkerQueryExecutor.java      | 122 +++++++++
 .../runtime/operator/BroadcastJoinOperator.java    | 165 ++++++++++++
 .../runtime/operator/MailboxReceiveOperator.java   | 137 ++++++++++
 .../runtime/operator/MailboxSendOperator.java      | 182 ++++++++++++++
 .../query/runtime/plan/DistributedStagePlan.java   |  76 ++++++
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 108 ++++++++
 .../runtime/plan/serde/StageNodeSerDeUtils.java    |  56 +++++
 .../query/runtime/utils/ServerRequestUtils.java    | 109 ++++++++
 .../apache/pinot/query/service/QueryConfig.java    |  26 +-
 .../pinot/query/service/QueryDispatcher.java       | 143 +++++++++++
 .../apache/pinot/query/service/QueryServer.java    | 115 +++++++++
 .../apache/pinot/query/QueryServerEnclosure.java   | 192 ++++++++++++++
 .../query/mailbox/GrpcMailboxServiceTest.java      |  72 ++++++
 .../query/mailbox/GrpcMailboxServiceTestBase.java  |  48 ++++
 .../pinot/query/runtime/QueryRunnerTest.java       | 278 +++++++++++++++++++++
 .../pinot/query/service/QueryServerTest.java       | 107 ++++++++
 pom.xml                                            |  19 ++
 54 files changed, 3413 insertions(+), 233 deletions(-)

diff --git a/pinot-common/src/main/proto/mailbox.proto b/pinot-common/src/main/proto/mailbox.proto
new file mode 100644
index 0000000000..6e1cca9a92
--- /dev/null
+++ b/pinot-common/src/main/proto/mailbox.proto
@@ -0,0 +1,55 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+package org.apache.pinot.common.proto;
+
+service PinotMailbox {
+  rpc open(stream MailboxContent) returns (stream MailboxStatus);
+}
+
+message MailboxStatus {
+  string mailboxId = 1;
+  map<string, string> metadata = 2;
+}
+
+message MailboxContent {
+  string mailboxId = 1;
+  bytes payload = 2;
+  map<string, string> metadata = 3;
+}
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
new file mode 100644
index 0000000000..c64798fa63
--- /dev/null
+++ b/pinot-common/src/main/proto/worker.proto
@@ -0,0 +1,74 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+package org.apache.pinot.common.proto;
+
+service PinotQueryWorker {
+  // Dispatch a QueryRequest to a PinotQueryWorker
+  rpc Submit(QueryRequest) returns (QueryResponse);
+}
+
+// QueryRequest is the dispatched content for a specific query stage on a specific worker.
+message QueryRequest {
+  map<string, string> metadata = 1;
+  StagePlan stagePlan = 2;
+}
+
+// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
+message QueryResponse {
+  map<string, string> metadata = 1;
+  bytes payload = 2;
+}
+
+message StagePlan {
+  int32 stageId = 1;
+  string instanceId = 2;
+  bytes serializedStageRoot = 3;
+  map<int32, StageMetadata> stageMetadata = 4;
+}
+
+message StageMetadata {
+  repeated string instances = 1;
+  repeated string dataSources = 2;
+  map<string, SegmentList> instanceToSegmentList = 3;
+}
+
+message SegmentList {
+  repeated string segments = 1;
+}
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index d5cdabf5de..2e501b3881 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -64,6 +64,14 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-controller</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-planner</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-runtime</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-tools</artifactId>
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
similarity index 63%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
index 4b4ca9165e..43ab79d6bc 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
@@ -16,25 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.nodes;
+package org.apache.pinot.query.planner;
 
-import org.apache.calcite.rel.logical.LogicalCalc;
-
-
-public class CalcNode extends AbstractStageNode {
-  private final String _expression;
-
-  public CalcNode(LogicalCalc node, String currentStageId) {
-    super(currentStageId);
-    _expression = toExpression(node);
-  }
-
-  public String getExpression() {
-    return _expression;
+/**
+ * Utilities used by planner.
+ */
+public class PlannerUtils {
+  private PlannerUtils() {
+    // do not instantiate.
   }
 
-  private String toExpression(LogicalCalc node) {
-    // TODO: make it real.
-    return node.getDigest();
+  public static boolean isRootStage(int stageId) {
+    return stageId == 0;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index bdaba4ac70..cb075bb9f3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -34,10 +34,10 @@ import org.apache.pinot.query.planner.nodes.StageNode;
  * </ul>
  */
 public class QueryPlan {
-  private Map<String, StageNode> _queryStageMap;
-  private Map<String, StageMetadata> _stageMetadataMap;
+  private Map<Integer, StageNode> _queryStageMap;
+  private Map<Integer, StageMetadata> _stageMetadataMap;
 
-  public QueryPlan(Map<String, StageNode> queryStageMap, Map<String, StageMetadata> stageMetadataMap) {
+  public QueryPlan(Map<Integer, StageNode> queryStageMap, Map<Integer, StageMetadata> stageMetadataMap) {
     _queryStageMap = queryStageMap;
     _stageMetadataMap = stageMetadataMap;
   }
@@ -46,7 +46,7 @@ public class QueryPlan {
    * Get the map between stageID and the stage plan root node.
    * @return stage plan map.
    */
-  public Map<String, StageNode> getQueryStageMap() {
+  public Map<Integer, StageNode> getQueryStageMap() {
     return _queryStageMap;
   }
 
@@ -54,7 +54,7 @@ public class QueryPlan {
    * Get the stage metadata information.
    * @return stage metadata info.
    */
-  public Map<String, StageMetadata> getStageMetadataMap() {
+  public Map<Integer, StageMetadata> getStageMetadataMap() {
     return _stageMetadataMap;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
index d167521f20..e558694aab 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
@@ -18,14 +18,26 @@
  */
 package org.apache.pinot.query.planner;
 
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.query.planner.nodes.CalcNode;
 import org.apache.pinot.query.planner.nodes.JoinNode;
 import org.apache.pinot.query.planner.nodes.StageNode;
 import org.apache.pinot.query.planner.nodes.TableScanNode;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 /**
@@ -45,7 +57,7 @@ public final class RelToStageConverter {
    * @param node relational node
    * @return stage node.
    */
-  public static StageNode toStageNode(RelNode node, String currentStageId) {
+  public static StageNode toStageNode(RelNode node, int currentStageId) {
     if (node instanceof LogicalCalc) {
       return convertLogicalCal((LogicalCalc) node, currentStageId);
     } else if (node instanceof LogicalTableScan) {
@@ -57,15 +69,34 @@ public final class RelToStageConverter {
     }
   }
 
-  private static StageNode convertLogicalTableScan(LogicalTableScan node, String currentStageId) {
-    return new TableScanNode(node, currentStageId);
+  private static StageNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) {
+    String tableName = node.getTable().getQualifiedName().get(0);
+    List<String> columnNames = node.getRowType().getFieldList().stream()
+        .map(RelDataTypeField::getName).collect(Collectors.toList());
+    return new TableScanNode(currentStageId, tableName, columnNames);
   }
 
-  private static StageNode convertLogicalCal(LogicalCalc node, String currentStageId) {
-    return new CalcNode(node, currentStageId);
+  private static StageNode convertLogicalCal(LogicalCalc node, int currentStageId) {
+    // TODO: support actual calcNode
+    return new CalcNode(currentStageId, node.getDigest());
   }
 
-  private static StageNode convertLogicalJoin(LogicalJoin node, String currentStageId) {
-    return new JoinNode(node, currentStageId);
+  private static StageNode convertLogicalJoin(LogicalJoin node, int currentStageId) {
+    JoinRelType joinType = node.getJoinType();
+    RexCall joinCondition = (RexCall) node.getCondition();
+    Preconditions.checkState(
+        joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) && joinCondition.getOperands().size() == 2,
+        "only equality JOIN is supported");
+    Preconditions.checkState(joinCondition.getOperands().get(0) instanceof RexInputRef, "only reference supported");
+    Preconditions.checkState(joinCondition.getOperands().get(1) instanceof RexInputRef, "only reference supported");
+    RelDataType leftRowType = node.getLeft().getRowType();
+    RelDataType rightRowType = node.getRight().getRowType();
+    int leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex();
+    int rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex();
+    KeySelector<Object[], Object> leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex);
+    KeySelector<Object[], Object> rightFieldSelectionKeySelector =
+          new FieldSelectionKeySelector(rightOperandIndex - leftRowType.getFieldNames().size());
+    return new JoinNode(currentStageId, joinType, Collections.singletonList(new JoinNode.JoinClause(
+        leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index 91eb5b77ca..90dd22a8aa 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -55,7 +55,7 @@ public class StageMetadata implements Serializable {
 
   public void attach(StageNode stageNode) {
     if (stageNode instanceof TableScanNode) {
-      _scannedTables.add(((TableScanNode) stageNode).getTableName().get(0));
+      _scannedTables.add(((TableScanNode) stageNode).getTableName());
     }
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
index 175e77f6ef..f6ec38738c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
@@ -41,8 +41,8 @@ public class StagePlanner {
   private final PlannerContext _plannerContext;
   private final WorkerManager _workerManager;
 
-  private Map<String, StageNode> _queryStageMap;
-  private Map<String, StageMetadata> _stageMetadataMap;
+  private Map<Integer, StageNode> _queryStageMap;
+  private Map<Integer, StageMetadata> _stageMetadataMap;
   private int _stageIdCounter;
 
   public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) {
@@ -60,7 +60,8 @@ public class StagePlanner {
     // clear the state
     _queryStageMap = new HashMap<>();
     _stageMetadataMap = new HashMap<>();
-    _stageIdCounter = 0;
+    // Stage ID starts with 1, 0 will be reserved for ROOT stage.
+    _stageIdCounter = 1;
 
     // walk the plan and create stages.
     StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
@@ -68,9 +69,10 @@ public class StagePlanner {
     // global root needs to send results back to the ROOT, a.k.a. the client response node.
     // the last stage is always a broadcast-gather.
     StageNode globalReceiverNode =
-        new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED);
-    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, globalReceiverNode.getStageId(),
+        new MailboxReceiveNode(0, globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalReceiverNode.getStageId(),
         RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    globalSenderNode.addInput(globalStageRoot);
     _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
     StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId());
     stageMetadata.attach(globalSenderNode);
@@ -81,7 +83,7 @@ public class StagePlanner {
     _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata);
 
     // assign workers to each stage.
-    for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+    for (Map.Entry<Integer, StageMetadata> e : _stageMetadataMap.entrySet()) {
       _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
     }
 
@@ -89,7 +91,7 @@ public class StagePlanner {
   }
 
   // non-threadsafe
-  private StageNode walkRelPlan(RelNode node, String currentStageId) {
+  private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       // 1. exchangeNode always have only one input, get its input converted as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
@@ -97,7 +99,9 @@ public class StagePlanner {
 
       // 2. make an exchange sender and receiver node pair
       StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot, mailboxReceiver.getStageId(), exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
+          exchangeType);
+      mailboxSender.addInput(nextStageRoot);
 
       // 3. put the sender side as a completed stage.
       _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
@@ -120,7 +124,7 @@ public class StagePlanner {
     return (node instanceof LogicalExchange);
   }
 
-  private String getNewStageId() {
-    return String.valueOf(_stageIdCounter++);
+  private int getNewStageId() {
+    return _stageIdCounter++;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
index 71701df762..b99075429a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
@@ -24,10 +24,10 @@ import java.util.List;
 
 public abstract class AbstractStageNode implements StageNode {
 
-  protected final String _stageId;
+  protected final int _stageId;
   protected final List<StageNode> _inputs;
 
-  public AbstractStageNode(String stageId) {
+  public AbstractStageNode(int stageId) {
     _stageId = stageId;
     _inputs = new ArrayList<>();
   }
@@ -43,7 +43,7 @@ public abstract class AbstractStageNode implements StageNode {
   }
 
   @Override
-  public String getStageId() {
+  public int getStageId() {
     return _stageId;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
index 4b4ca9165e..b188b8e2f7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
@@ -18,23 +18,16 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import org.apache.calcite.rel.logical.LogicalCalc;
-
 
 public class CalcNode extends AbstractStageNode {
   private final String _expression;
 
-  public CalcNode(LogicalCalc node, String currentStageId) {
-    super(currentStageId);
-    _expression = toExpression(node);
+  public CalcNode(int stageId, String expression) {
+    super(stageId);
+    _expression = expression;
   }
 
   public String getExpression() {
     return _expression;
   }
-
-  private String toExpression(LogicalCalc node) {
-    // TODO: make it real.
-    return node.getDigest();
-  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
index 520af693d6..94af122c74 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
@@ -18,69 +18,46 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class JoinNode extends AbstractStageNode {
-  private final JoinRelType _joinType;
-  private final int _leftOperandIndex;
-  private final int _rightOperandIndex;
-  private final FieldSelectionKeySelector _leftFieldSelectionKeySelector;
-  private final FieldSelectionKeySelector _rightFieldSelectionKeySelector;
+  private final JoinRelType _joinRelType;
+  private final List<JoinClause> _criteria;
 
-  private transient final RelDataType _leftRowType;
-  private transient final RelDataType _rightRowType;
-
-  public JoinNode(LogicalJoin node, String currentStageId) {
-    super(currentStageId);
-    _joinType = node.getJoinType();
-    RexCall joinCondition = (RexCall) node.getCondition();
-    Preconditions.checkState(
-        joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) && joinCondition.getOperands().size() == 2,
-        "only equality JOIN is supported");
-    Preconditions.checkState(joinCondition.getOperands().get(0) instanceof RexInputRef, "only reference supported");
-    Preconditions.checkState(joinCondition.getOperands().get(1) instanceof RexInputRef, "only reference supported");
-    _leftRowType = node.getLeft().getRowType();
-    _rightRowType = node.getRight().getRowType();
-    _leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex();
-    _rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex();
-    _leftFieldSelectionKeySelector = new FieldSelectionKeySelector(_leftOperandIndex);
-    _rightFieldSelectionKeySelector =
-        new FieldSelectionKeySelector(_rightOperandIndex - _leftRowType.getFieldNames().size());
-  }
-
-  public JoinRelType getJoinType() {
-    return _joinType;
+  public JoinNode(int stageId, JoinRelType joinRelType, List<JoinClause> criteria
+  ) {
+    super(stageId);
+    _joinRelType = joinRelType;
+    _criteria = criteria;
   }
 
-  public RelDataType getLeftRowType() {
-    return _leftRowType;
+  public JoinRelType getJoinRelType() {
+    return _joinRelType;
   }
 
-  public RelDataType getRightRowType() {
-    return _rightRowType;
+  public List<JoinClause> getCriteria() {
+    return _criteria;
   }
 
-  public int getLeftOperandIndex() {
-    return _leftOperandIndex;
-  }
+  public static class JoinClause implements Serializable {
+    private final KeySelector<Object[], Object> _leftJoinKeySelector;
+    private final KeySelector<Object[], Object> _rightJoinKeySelector;
 
-  public int getRightOperandIndex() {
-    return _rightOperandIndex;
-  }
+    public JoinClause(KeySelector<Object[], Object> leftKeySelector, KeySelector<Object[], Object> rightKeySelector) {
+      _leftJoinKeySelector = leftKeySelector;
+      _rightJoinKeySelector = rightKeySelector;
+    }
 
-  public FieldSelectionKeySelector getLeftJoinKeySelector() {
-    return _leftFieldSelectionKeySelector;
-  }
+    public KeySelector<Object[], Object> getLeftJoinKeySelector() {
+      return _leftJoinKeySelector;
+    }
 
-  public FieldSelectionKeySelector getRightJoinKeySelector() {
-    return _rightFieldSelectionKeySelector;
+    public KeySelector<Object[], Object> getRightJoinKeySelector() {
+      return _rightJoinKeySelector;
+    }
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
index 947d449383..d8269346f3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
@@ -18,33 +18,20 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import java.util.Collections;
-import java.util.List;
 import org.apache.calcite.rel.RelDistribution;
 
 
 public class MailboxReceiveNode extends AbstractStageNode {
-
-  private final String _senderStageId;
+  private final int _senderStageId;
   private final RelDistribution.Type _exchangeType;
 
-  public MailboxReceiveNode(String stageId, String senderStageId, RelDistribution.Type exchangeType) {
+  public MailboxReceiveNode(int stageId, int senderStageId, RelDistribution.Type exchangeType) {
     super(stageId);
     _senderStageId = senderStageId;
     _exchangeType = exchangeType;
   }
 
-  @Override
-  public List<StageNode> getInputs() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void addInput(StageNode stageNode) {
-    throw new UnsupportedOperationException("no input should be added to mailbox receive.");
-  }
-
-  public String getSenderStageId() {
+  public int getSenderStageId() {
     return _senderStageId;
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
index 6db9578f35..ea39ad3493 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
@@ -18,34 +18,20 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import java.util.Collections;
-import java.util.List;
 import org.apache.calcite.rel.RelDistribution;
 
 
 public class MailboxSendNode extends AbstractStageNode {
-  private final StageNode _stageRoot;
-  private final String _receiverStageId;
+  private final int _receiverStageId;
   private final RelDistribution.Type _exchangeType;
 
-  public MailboxSendNode(StageNode stageRoot, String receiverStageId, RelDistribution.Type exchangeType) {
-    super(stageRoot.getStageId());
-    _stageRoot = stageRoot;
+  public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) {
+    super(stageId);
     _receiverStageId = receiverStageId;
     _exchangeType = exchangeType;
   }
 
-  @Override
-  public List<StageNode> getInputs() {
-    return Collections.singletonList(_stageRoot);
-  }
-
-  @Override
-  public void addInput(StageNode queryStageRoot) {
-    throw new UnsupportedOperationException("mailbox cannot be changed!");
-  }
-
-  public String getReceiverStageId() {
+  public int getReceiverStageId() {
     return _receiverStageId;
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
index 8fcbb5e01a..cd34aca530 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
@@ -36,5 +36,5 @@ public interface StageNode extends Serializable {
 
   void addInput(StageNode stageNode);
 
-  String getStageId();
+  int getStageId();
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
index 0bb6e0f704..8d78ec6d0f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
@@ -18,36 +18,20 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.apache.calcite.rel.type.RelDataTypeField;
 
 
 public class TableScanNode extends AbstractStageNode {
-  private final List<String> _tableName;
+  private final String _tableName;
   private final List<String> _tableScanColumns;
 
-  public TableScanNode(LogicalTableScan tableScan, String stageId) {
+  public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) {
     super(stageId);
-    _tableName = tableScan.getTable().getQualifiedName();
-    // TODO: optimize this, table field is not directly usable as name.
-    _tableScanColumns =
-        tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
+    _tableName = tableName;
+    _tableScanColumns = tableScanColumns;
   }
 
-  @Override
-  public List<StageNode> getInputs() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void addInput(StageNode queryStageRoot) {
-    throw new UnsupportedOperationException("TableScanNode cannot add input as it is a leaf node");
-  }
-
-  public List<String> getTableName() {
+  public String getTableName() {
     return _tableName;
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 9e0c776fd5..0b846e555c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pinot.query.planner.partitioning;
 
-import java.io.Serializable;
-
 
 /**
  * The {@code FieldSelectionKeySelector} simply extract a column value out from a row array {@link Object[]}.
  */
-public class FieldSelectionKeySelector implements KeySelector<Object[], Object>, Serializable {
+public class FieldSelectionKeySelector implements KeySelector<Object[], Object> {
 
   private int _columnIndex;
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
index 79dc987d9d..eaefb77604 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.query.planner.partitioning;
 
+import java.io.Serializable;
+
+
 /**
  * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key.
  *
@@ -25,7 +28,7 @@ package org.apache.pinot.query.planner.partitioning;
  *
  * <p>Key selector should always produce the same selection hash key when the same input is provided.
  */
-public interface KeySelector<IN, OUT> {
+public interface KeySelector<IN, OUT> extends Serializable {
 
   /**
    * Extract the key out of an input data construct.
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index a3102580b6..5b6ba6d804 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -56,14 +57,14 @@ public class WorkerManager {
     _routingManager = routingManager;
   }
 
-  public void assignWorkerToStage(String stageId, StageMetadata stageMetadata) {
+  public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
     List<String> scannedTables = stageMetadata.getScannedTables();
     if (scannedTables.size() == 1) { // table scan stage, need to attach server as well as segment info.
       RoutingTable routingTable = getRoutingTable(scannedTables.get(0));
       Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
       stageMetadata.setServerInstances(new ArrayList<>(serverInstanceToSegmentsMap.keySet()));
       stageMetadata.setServerInstanceToSegmentsMap(new HashMap<>(serverInstanceToSegmentsMap));
-    } else if (stageId.equalsIgnoreCase("ROOT")) {
+    } else if (PlannerUtils.isRootStage(stageId)) {
       // ROOT stage doesn't have a QueryServer as it is strictly only reducing results.
       // here we simply assign the worker instance with identical server/mailbox port number.
       stageMetadata.setServerInstances(Lists.newArrayList(new WorkerInstance(_hostName, _port, _port)));
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
index 911469644b..60c7cd11af 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.routing.WorkerManager;
@@ -72,7 +73,7 @@ public class QueryEnvironmentTest {
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
     Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4);
     Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4);
-    for (Map.Entry<String, StageMetadata> e : queryPlan.getStageMetadataMap().entrySet()) {
+    for (Map.Entry<Integer, StageMetadata> e : queryPlan.getStageMetadataMap().entrySet()) {
       List<String> tables = e.getValue().getScannedTables();
       if (tables.size() == 1) {
         // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
@@ -80,7 +81,7 @@ public class QueryEnvironmentTest {
             e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
             tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1", "Server_localhost_2")
                 : ImmutableList.of("Server_localhost_1"));
-      } else if (!e.getKey().equals("ROOT")) {
+      } else if (!PlannerUtils.isRootStage(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(
             e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
index cc0db385db..7585f9bd7c 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -112,11 +112,11 @@ public class QueryEnvironmentTestUtils {
     return mock;
   }
 
-  public static String getTestStageByServerCount(QueryPlan queryPlan, int serverCount) {
-    List<String> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
-        .filter(e -> !e.getKey().equals("ROOT") && e.getValue().getServerInstances().size() == serverCount)
+  public static int getTestStageByServerCount(QueryPlan queryPlan, int serverCount) {
+    List<Integer> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
+        .filter(e -> !e.getKey().equals(0) && e.getValue().getServerInstances().size() == serverCount)
         .map(Map.Entry::getKey).collect(Collectors.toList());
-    return stageIds.size() > 0 ? stageIds.get(0) : null;
+    return stageIds.size() > 0 ? stageIds.get(0) : -1;
   }
 
   public static int getAvailablePort() {
diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml
new file mode 100644
index 0000000000..3607e51e31
--- /dev/null
+++ b/pinot-query-runtime/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>pinot</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>pinot-query-runtime</artifactId>
+  <name>Pinot Query Runtime</name>
+  <url>https://pinot.apache.org/</url>
+
+  <properties>
+    <pinot.root>${basedir}/..</pinot.root>
+  </properties>
+
+  <dependencies>
+    <!-- Pinot dependencies -->
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-planner</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-planner</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
new file mode 100644
index 0000000000..30f0518a19
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import io.grpc.ManagedChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.proto.Mailbox.MailboxContent;
+import org.apache.pinot.query.mailbox.channel.ChannelManager;
+
+
+/**
+ * GRPC-based implementation of {@link MailboxService}.
+ *
+ * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
+ * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ *
+ * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
+ * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
+ * after the last successful message sent/received.
+ *
+ * <p>Noted that:
+ * <ul>
+ *   <li>the latter part of the mailboxID consist of the channelID.</li>
+ *   <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
+ *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
+ * </ul>
+ */
+public class GrpcMailboxService implements MailboxService<MailboxContent> {
+  // channel manager
+  private final ChannelManager _channelManager;
+  private final String _hostname;
+  private final int _mailboxPort;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<MailboxContent>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<MailboxContent>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+
+  public GrpcMailboxService(String hostname, int mailboxPort) {
+    _hostname = hostname;
+    _mailboxPort = mailboxPort;
+    _channelManager = new ChannelManager(this);
+  }
+
+  @Override
+  public void start() {
+    _channelManager.init();
+  }
+
+  @Override
+  public void shutdown() {
+    _channelManager.shutdown();
+  }
+
+  @Override
+  public String getHostname() {
+    return _hostname;
+  }
+
+  @Override
+  public int getMailboxPort() {
+    return _mailboxPort;
+  }
+
+  /**
+   * Register a mailbox, mailbox needs to be registered before use.
+   * @param mailboxId the id of the mailbox.
+   */
+  public SendingMailbox<MailboxContent> getSendingMailbox(String mailboxId) {
+    return _sendingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcSendingMailbox(mId, this));
+  }
+
+  /**
+   * Register a mailbox, mailbox needs to be registered before use.
+   * @param mailboxId the id of the mailbox.
+   */
+  public ReceivingMailbox<MailboxContent> getReceivingMailbox(String mailboxId) {
+    return _receivingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcReceivingMailbox(mId, this));
+  }
+
+  public ManagedChannel getChannel(String mailboxId) {
+    return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
new file mode 100644
index 0000000000..2ce0e54a4e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.proto.Mailbox.MailboxContent;
+import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
+
+
+/**
+ * GRPC implementation of the {@link ReceivingMailbox}.
+ */
+public class GrpcReceivingMailbox implements ReceivingMailbox<MailboxContent> {
+  private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
+  private final GrpcMailboxService _mailboxService;
+  private final String _mailboxId;
+  private final CountDownLatch _initializationLatch;
+  private final AtomicInteger _totalMsgReceived = new AtomicInteger(0);
+
+  private MailboxContentStreamObserver _contentStreamObserver;
+
+  public GrpcReceivingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
+    _mailboxService = mailboxService;
+    _mailboxId = mailboxId;
+    _initializationLatch = new CountDownLatch(1);
+  }
+
+  public void init(MailboxContentStreamObserver streamObserver) {
+    if (_initializationLatch.getCount() > 0) {
+      _contentStreamObserver = streamObserver;
+      _initializationLatch.countDown();
+    }
+  }
+
+  @Override
+  public MailboxContent receive()
+      throws Exception {
+    MailboxContent mailboxContent = null;
+    if (waitForInitialize()) {
+      mailboxContent = _contentStreamObserver.poll();
+      _totalMsgReceived.incrementAndGet();
+    }
+    return mailboxContent;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return _initializationLatch.getCount() <= 0;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isInitialized() && _contentStreamObserver.isCompleted();
+  }
+
+  // TODO: fix busy wait. This should be guarded by timeout.
+  private boolean waitForInitialize()
+      throws Exception {
+    if (_initializationLatch.getCount() > 0) {
+      return _initializationLatch.await(DEFAULT_MAILBOX_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
new file mode 100644
index 0000000000..2aff8a03e3
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import io.grpc.ManagedChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.proto.Mailbox.MailboxContent;
+import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
+
+/**
+ * GRPC implementation of the {@link SendingMailbox}.
+ */
+public class GrpcSendingMailbox implements SendingMailbox<MailboxContent> {
+  private final GrpcMailboxService _mailboxService;
+  private final String _mailboxId;
+  private final AtomicBoolean _initialized = new AtomicBoolean(false);
+  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
+
+  private MailboxStatusStreamObserver _statusStreamObserver;
+
+  public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
+    _mailboxService = mailboxService;
+    _mailboxId = mailboxId;
+    _initialized.set(false);
+  }
+
+  public void init()
+      throws UnsupportedOperationException {
+    ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
+    PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
+    _statusStreamObserver = new MailboxStatusStreamObserver();
+    _statusStreamObserver.init(stub.open(_statusStreamObserver));
+    _initialized.set(true);
+  }
+
+  @Override
+  public void send(MailboxContent data)
+      throws UnsupportedOperationException {
+    if (!_initialized.get()) {
+      // initialization is special
+      init();
+    }
+    _statusStreamObserver.send(data);
+    _totalMsgSent.incrementAndGet();
+  }
+
+  @Override
+  public void complete() {
+    _statusStreamObserver.complete();
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
similarity index 54%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
copy to pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index 79dc987d9d..ee72494cdc 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -16,22 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.partitioning;
+package org.apache.pinot.query.mailbox;
 
 /**
- * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key.
+ * {@link MailboxIdentifier} uniquely identify the mailbox that pairs a sender and a receiver.
  *
- * <p>This key selector is used for computation such as GROUP BY or equality JOINs.
- *
- * <p>Key selector should always produce the same selection hash key when the same input is provided.
+ * <p>It consists of the job_id and the partition key. as well as the component for a channelID.
  */
-public interface KeySelector<IN, OUT> {
+public interface MailboxIdentifier {
+
+  /**
+   * get the job identifier.
+   * @return job identifier.
+   */
+  String getJobId();
+
+  /**
+   * get the sender host.
+   * @return sender host
+   */
+  String getFromHost();
+
+  /**
+   * get the sender port.
+   * @return sender port
+   */
+  int getFromPort();
+
+  /**
+   * get the receiver host.
+   * @return receiver host
+   */
+  String getToHost();
 
   /**
-   * Extract the key out of an input data construct.
-   *
-   * @param input input data.
-   * @return the key of the input data.
+   * get the receiver port.
+   * @return receiver port
    */
-  OUT getKey(IN input);
+  int getToPort();
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
new file mode 100644
index 0000000000..1943b12efb
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+/**
+ * Mailbox service that handles transfer for mailbox contents.
+ *
+ * @param <T> type of content supported by this mailbox service.
+ */
+public interface MailboxService<T> {
+
+  /**
+   * Starting the mailbox service.
+   */
+  void start();
+
+  /**
+   * Shutting down the mailbox service.s
+   */
+  void shutdown();
+
+  /**
+   * Get the host name on which this mailbox service is runnning on.
+   *
+   * @return the host.
+   */
+  String getHostname();
+
+  /**
+   * Get the host port that receives inbound mailbox message.
+   *
+   * @return the port.
+   */
+  int getMailboxPort();
+
+  /**
+   * Look up a receiving mailbox by {@link MailboxIdentifier}.
+   *
+   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
+   * initialized.
+   *
+   * @param mailboxId mailbox identifier.
+   * @return a receiving mailbox.
+   */
+  ReceivingMailbox<T> getReceivingMailbox(String mailboxId);
+
+  /**
+   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   *
+   * @param mailboxId mailbox identifier.
+   * @return a sending mailbox.
+   */
+  SendingMailbox<T> getSendingMailbox(String mailboxId);
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
similarity index 55%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
copy to pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 79dc987d9d..9e17d827f2 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -16,22 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.partitioning;
+package org.apache.pinot.query.mailbox;
 
 /**
- * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key.
+ * Mailbox is used to send and receive data.
  *
- * <p>This key selector is used for computation such as GROUP BY or equality JOINs.
+ * Mailbox should be instantiated on both side of MailboxServer.
  *
- * <p>Key selector should always produce the same selection hash key when the same input is provided.
+ * @param <T> type of data carried over the mailbox.
  */
-public interface KeySelector<IN, OUT> {
+public interface ReceivingMailbox<T> {
 
   /**
-   * Extract the key out of an input data construct.
+   * get the unique identifier for the mailbox.
    *
-   * @param input input data.
-   * @return the key of the input data.
+   * @return Mailbox ID.
    */
-  OUT getKey(IN input);
+  String getMailboxId();
+
+  /**
+   * receive a data packet from the mailbox.
+   * @return data packet.
+   * @throws Exception
+   */
+  T receive()
+      throws Exception;
+
+  /**
+   * Check if receiving mailbox is initialized.
+   * @return
+   */
+  boolean isInitialized();
+
+  /**
+   * Check if mailbox is closed.
+   * @return
+   */
+  boolean isClosed();
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
similarity index 58%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
copy to pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 79dc987d9d..11cee29f0a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -16,22 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.partitioning;
+package org.apache.pinot.query.mailbox;
 
 /**
- * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key.
+ * Mailbox is used to send and receive data.
  *
- * <p>This key selector is used for computation such as GROUP BY or equality JOINs.
+ * Mailbox should be instantiated on both side of MailboxServer.
  *
- * <p>Key selector should always produce the same selection hash key when the same input is provided.
+ * @param <T> type of data carried over the mailbox.
  */
-public interface KeySelector<IN, OUT> {
+public interface SendingMailbox<T> {
 
   /**
-   * Extract the key out of an input data construct.
+   * get the unique identifier for the mailbox.
    *
-   * @param input input data.
-   * @return the key of the input data.
+   * @return Mailbox ID.
    */
-  OUT getKey(IN input);
+  String getMailboxId();
+
+  /**
+   * send a data packet through the mailbox.
+   * @param data
+   * @throws UnsupportedOperationException
+   */
+  void send(T data)
+      throws UnsupportedOperationException;
+
+  /**
+   * Complete delivery of the current mailbox.
+   */
+  void complete();
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
new file mode 100644
index 0000000000..21712273e2
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+
+public class StringMailboxIdentifier implements MailboxIdentifier {
+  private static final Joiner JOINER = Joiner.on(':');
+
+  private final String _mailboxIdString;
+  private final String _jobId;
+  private final String _fromHost;
+  private final int _fromPort;
+  private final String _toHost;
+  private final int _toPort;
+
+  public StringMailboxIdentifier(String jobId, String fromHost, int fromPort, String toHost,
+      int toPort) {
+    _jobId = jobId;
+    _fromHost = fromHost;
+    _fromPort = fromPort;
+    _toHost = toHost;
+    _toPort = toPort;
+    _mailboxIdString = JOINER.join(_jobId, _fromHost, _fromPort, _toHost, _toPort);
+  }
+
+  public StringMailboxIdentifier(String mailboxId) {
+    _mailboxIdString = mailboxId;
+    String[] splits = mailboxId.split(":");
+    Preconditions.checkState(splits.length == 5);
+    _jobId = splits[0];
+    _fromHost = splits[1];
+    _fromPort = Integer.parseInt(splits[2]);
+    _toHost = splits[3];
+    _toPort = Integer.parseInt(splits[4]);
+
+    // assert that resulting string are identical.
+    Preconditions.checkState(
+        JOINER.join(_jobId, _fromHost, _fromPort, _toHost, _toPort).equals(_mailboxIdString));
+  }
+
+  @Override
+  public String getJobId() {
+    return _jobId;
+  }
+
+  @Override
+  public String getFromHost() {
+    return _fromHost;
+  }
+
+  @Override
+  public int getFromPort() {
+    return _fromPort;
+  }
+
+  @Override
+  public String getToHost() {
+    return _toHost;
+  }
+
+  @Override
+  public int getToPort() {
+    return _toPort;
+  }
+
+  @Override
+  public String toString() {
+    return _mailboxIdString;
+  }
+
+  @Override
+  public int hashCode() {
+    return _mailboxIdString.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    return (that instanceof StringMailboxIdentifier) && _mailboxIdString.equals(
+        ((StringMailboxIdentifier) that)._mailboxIdString);
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
similarity index 55%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
copy to pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
index 71701df762..90a4070544 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
@@ -16,34 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.nodes;
+package org.apache.pinot.query.mailbox;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Joiner;
 
 
-public abstract class AbstractStageNode implements StageNode {
+public final class Utils {
+  private static final Joiner JOINER = Joiner.on(':');
 
-  protected final String _stageId;
-  protected final List<StageNode> _inputs;
-
-  public AbstractStageNode(String stageId) {
-    _stageId = stageId;
-    _inputs = new ArrayList<>();
+  private Utils() {
+    // do not instantiate.
   }
 
-  @Override
-  public List<StageNode> getInputs() {
-    return _inputs;
+  public static String constructChannelId(String mailboxId) {
+    MailboxIdentifier mailboxIdentifier = toMailboxIdentifier(mailboxId);
+    return JOINER.join(mailboxIdentifier.getToHost(), mailboxIdentifier.getToPort());
   }
 
-  @Override
-  public void addInput(StageNode stageNode) {
-    _inputs.add(stageNode);
+  public static MailboxIdentifier toMailboxIdentifier(String mailboxId) {
+    return new StringMailboxIdentifier(mailboxId);
   }
 
-  @Override
-  public String getStageId() {
-    return _stageId;
+  public static String fromMailboxIdentifier(MailboxIdentifier mailboxId) {
+    return mailboxId.toString();
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
new file mode 100644
index 0000000000..54ff4bcfb7
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+
+
+/**
+ * {@code ChannelManager} manages Grpc send/receive channels.
+ *
+ * <p>Grpc channels are managed centralized per Pinot component. Channels should be reused across different
+ * query/job/stages.
+ *
+ * <p>the channelId should be in the format of: <code>"senderHost:senderPort:receiverHost:receiverPort"</code>
+ */
+public class ChannelManager {
+  private static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024;
+
+  private final GrpcMailboxService _mailboxService;
+  private final GrpcMailboxServer _grpcMailboxServer;
+
+  private final ConcurrentHashMap<String, ManagedChannel> _channelMap = new ConcurrentHashMap<>();
+
+  public ChannelManager(GrpcMailboxService mailboxService) {
+    _mailboxService = mailboxService;
+    _grpcMailboxServer = new GrpcMailboxServer(_mailboxService, _mailboxService.getMailboxPort());
+  }
+
+  public void init() {
+    _grpcMailboxServer.start();
+  }
+
+  public void shutdown() {
+    _grpcMailboxServer.shutdown();
+  }
+
+  public ManagedChannel getChannel(String channelId) {
+    String[] channelParts = channelId.split(":");
+    return _channelMap.computeIfAbsent(channelId,
+        (id) -> ManagedChannelBuilder.forAddress(channelParts[0], Integer.parseInt(channelParts[1]))
+            .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE).usePlaintext().build());
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
new file mode 100644
index 0000000000..a260bbaeef
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@code GrpcMailboxServer} manages GRPC-based mailboxes by creating a stream-stream GRPC server.
+ *
+ * <p>This GRPC server is responsible for constructing {@link StreamObserver} out of an initial "open" request
+ * send by the sender of the sender/receiver pair.
+ */
+public class GrpcMailboxServer extends PinotMailboxGrpc.PinotMailboxImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxServer.class);
+  private static final long DEFAULT_GRPC_MAILBOX_SERVER_TIMEOUT = 10000L;
+
+  private final GrpcMailboxService _mailboxService;
+  private final Server _server;
+
+  public GrpcMailboxServer(GrpcMailboxService mailboxService, int port) {
+    _mailboxService = mailboxService;
+    _server = ServerBuilder.forPort(port).addService(this).build();
+    LOGGER.info("Initialized GrpcMailboxServer on port: {}", port);
+  }
+
+  public void start() {
+    try {
+      _server.start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void shutdown() {
+    try {
+      _server.shutdown().awaitTermination(DEFAULT_GRPC_MAILBOX_SERVER_TIMEOUT, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public StreamObserver<Mailbox.MailboxContent> open(StreamObserver<Mailbox.MailboxStatus> responseObserver) {
+    return new MailboxContentStreamObserver(_mailboxService, responseObserver);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
new file mode 100644
index 0000000000..8aeb560c8f
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@code MailboxContentStreamObserver} is the content streaming observer used to receive mailbox content.
+ *
+ * <p>When the observer onNext() is called (e.g. when data packet has arrived at the receiving end), it puts the
+ * mailbox content to the receiving mailbox buffer; response with the remaining buffer size of the receiving mailbox
+ * to the sender side.
+ */
+public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+  private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
+  private static final long DEFAULT_MAILBOX_POLL_TIMEOUT = 1000L;
+  private final GrpcMailboxService _mailboxService;
+  private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
+  private final boolean _isEnabledFeedback;
+
+  private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
+  private ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+  public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
+      StreamObserver<Mailbox.MailboxStatus> responseObserver) {
+    this(mailboxService, responseObserver, false);
+  }
+
+  public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
+      StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean isEnabledFeedback) {
+    _mailboxService = mailboxService;
+    _responseObserver = responseObserver;
+    _receivingBuffer = new ArrayBlockingQueue<>(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+    _isEnabledFeedback = isEnabledFeedback;
+  }
+
+  public Mailbox.MailboxContent poll() {
+    while (!isCompleted()) {
+      try {
+        Mailbox.MailboxContent content = _receivingBuffer.poll(DEFAULT_MAILBOX_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+        if (content != null) {
+          return content;
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupt occurred while waiting for mailbox content", e);
+      }
+    }
+    return null;
+  }
+
+  public boolean isCompleted() {
+    return _isCompleted.get() && _receivingBuffer.isEmpty();
+  }
+
+  @Override
+  public void onNext(Mailbox.MailboxContent mailboxContent) {
+    GrpcReceivingMailbox receivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService.getReceivingMailbox(mailboxContent.getMailboxId());
+    receivingMailbox.init(this);
+    // when the receiving end receives a message put it in the mailbox queue.
+    _receivingBuffer.offer(mailboxContent);
+    if (_isEnabledFeedback) {
+      // TODO: this has race conditions with onCompleted() because sender blindly closes connection channels once
+      // it has finished sending all the data packets.
+      int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
+      Mailbox.MailboxStatus.Builder builder =
+          Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
+              .putMetadata("buffer.size", String.valueOf(remainingCapacity));
+      if (mailboxContent.getMetadataMap().get("finished") != null) {
+        builder.putMetadata("finished", "true");
+      }
+      Mailbox.MailboxStatus status = builder.build();
+      // returns the buffer available size to sender for rate controller / throttling.
+      _responseObserver.onNext(status);
+    }
+  }
+
+  @Override
+  public void onError(Throwable e) {
+    throw new RuntimeException(e);
+  }
+
+  @Override
+  public void onCompleted() {
+    _isCompleted.set(true);
+    _responseObserver.onCompleted();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
new file mode 100644
index 0000000000..45233952e2
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.proto.Mailbox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@code MailboxStatusStreamObserver} is used by the SendingMailbox to send data over the wire.
+ *
+ * <p>Once {@link org.apache.pinot.query.mailbox.GrpcSendingMailbox#init()} is called, one instances of this class is
+ * created based on the opened GRPC connection returned {@link StreamObserver}. From this point, the sending mailbox
+ * can use the {@link MailboxStatusStreamObserver#send(Mailbox.MailboxContent)} API to send data packet to the receiving
+ * end.
+ */
+public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
+  private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
+  private static final long DEFAULT_MAILBOX_POLL_TIMEOUT_MS = 1000L;
+  private final AtomicInteger _bufferSize = new AtomicInteger(5);
+  private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
+
+  private StreamObserver<Mailbox.MailboxContent> _mailboxContentStreamObserver;
+
+  public MailboxStatusStreamObserver() {
+  }
+
+  public void init(StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver) {
+    _mailboxContentStreamObserver = mailboxContentStreamObserver;
+  }
+
+  public void send(Mailbox.MailboxContent mailboxContent) {
+    _mailboxContentStreamObserver.onNext(mailboxContent);
+  }
+
+  public void complete() {
+    _mailboxContentStreamObserver.onCompleted();
+  }
+
+  @Override
+  public void onNext(Mailbox.MailboxStatus mailboxStatus) {
+    // when received a mailbox status from the receiving end, sending end update the known buffer size available
+    // so we can make better throughput send judgement. here is a simple example.
+    // TODO: this feedback info is not used to throttle the send speed. it is currently being discarded.
+    if (mailboxStatus.getMetadataMap().containsKey("buffer.size")) {
+      _bufferSize.set(Integer.parseInt(mailboxStatus.getMetadataMap().get("buffer.size")));
+    } else {
+      _bufferSize.set(DEFAULT_MAILBOX_QUEUE_CAPACITY); // DEFAULT_AVAILABILITY;
+    }
+  }
+
+  @Override
+  public void onError(Throwable e) {
+    _isCompleted.set(true);
+    shutdown();
+    throw new RuntimeException(e);
+  }
+
+  private void shutdown() {
+  }
+
+  @Override
+  public void onCompleted() {
+    _isCompleted.set(true);
+    shutdown();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
new file mode 100644
index 0000000000..2968ff9a23
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.utils.ServerRequestUtils;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * {@link QueryRunner} accepts a {@link DistributedStagePlan} and runs it.
+ */
+public class QueryRunner {
+  // This is a temporary before merging the 2 type of executor.
+  private ServerQueryExecutorV1Impl _serverExecutor;
+  private WorkerQueryExecutor _workerExecutor;
+  private MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private String _hostname;
+  private int _port;
+
+  /**
+   * Initializes the query executor.
+   * <p>Should be called only once and before calling any other method.
+   */
+  public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) {
+    String instanceName = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
+    _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
+        CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
+    _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+    try {
+      _mailboxService = new GrpcMailboxService(_hostname, _port);
+      _serverExecutor = new ServerQueryExecutorV1Impl();
+      _serverExecutor.init(config, instanceDataManager, serverMetrics);
+      _workerExecutor = new WorkerQueryExecutor();
+      _workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void start() {
+    _mailboxService.start();
+    _serverExecutor.start();
+    _workerExecutor.start();
+  }
+
+  public void shutDown() {
+    _workerExecutor.shutDown();
+    _serverExecutor.shutDown();
+    _mailboxService.shutdown();
+  }
+
+  public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorService executorService,
+      Map<String, String> requestMetadataMap) {
+    if (isLeafStage(distributedStagePlan)) {
+      // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table
+      // and package it here for return. But we should really use a MailboxSendOperator directly put into the
+      // server executor.
+      ServerQueryRequest serverQueryRequest =
+          ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap);
+
+      // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
+      DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
+
+      MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
+      StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
+      MailboxSendOperator mailboxSendOperator =
+          new MailboxSendOperator(_mailboxService, dataTable, receivingStageMetadata.getServerInstances(),
+              sendNode.getExchangeType(), _hostname, _port, serverQueryRequest.getRequestId(),
+              sendNode.getStageId());
+      mailboxSendOperator.nextBlock();
+    } else {
+      _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService);
+    }
+  }
+
+  private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
+    int stageId = distributedStagePlan.getStageId();
+    ServerInstance serverInstance = distributedStagePlan.getServerInstance();
+    StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(stageId);
+    List<String> segments = stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
+    return segments != null && segments.size() > 0;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
new file mode 100644
index 0000000000..a8cc63c624
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Block;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.BlockDocIdValueSet;
+import org.apache.pinot.core.common.BlockMetadata;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@code DataTableBlock} is a row-based data block backed by a {@link DataTable}.
+ */
+public class DataTableBlock implements Block {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseBlock.class);
+
+  private DataTable _dataTable;
+
+  public DataTableBlock(DataTable dataTable) {
+    _dataTable = dataTable;
+  }
+
+  public DataTable getDataTable() {
+    return _dataTable;
+  }
+
+  @Override
+  public BlockValSet getBlockValueSet() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BlockDocIdValueSet getBlockDocIdValueSet() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BlockDocIdSet getBlockDocIdSet() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BlockMetadata getMetadata() {
+    throw new UnsupportedOperationException();
+  }
+
+  public byte[] toBytes()
+      throws IOException {
+    return _dataTable.toBytes();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
new file mode 100644
index 0000000000..c429e2e014
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+
+
+public final class DataTableBlockUtils {
+  private DataTableBlockUtils() {
+    // do not instantiate.
+  }
+
+  // used to indicate a datatable block status
+  private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]);
+  private static final DataTable EMPTY_DATATABLE = new DataTableBuilder(EMPTY_SCHEMA).build();
+  private static final DataTableBlock END_OF_STREAM_DATATABLE_BLOCK = new DataTableBlock(EMPTY_DATATABLE);
+
+  public static DataTableBlock getEndOfStreamDataTableBlock() {
+    return END_OF_STREAM_DATATABLE_BLOCK;
+  }
+
+  public static DataTable getEndOfStreamDataTable() {
+    return EMPTY_DATATABLE;
+  }
+
+  public static DataTable getErrorDataTable(Exception e) {
+    DataTable errorDataTable = new DataTableBuilder(EMPTY_SCHEMA).build();
+    errorDataTable.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage());
+    return errorDataTable;
+  }
+
+  public static DataTableBlock getErrorDatatableBlock(Exception e) {
+    return new DataTableBlock(getErrorDataTable(e));
+  }
+
+  public static DataTable getEmptyDataTable(DataSchema dataSchema) {
+    if (dataSchema != null) {
+      return new DataTableBuilder(dataSchema).build();
+    } else {
+      return EMPTY_DATATABLE;
+    }
+  }
+
+  public static DataTableBlock getEmptyDataTableBlock(DataSchema dataSchema) {
+    return new DataTableBlock(getEmptyDataTable(dataSchema));
+  }
+
+  public static boolean isEndOfStream(DataTableBlock dataTableBlock) {
+    DataSchema dataSchema = dataTableBlock.getDataTable().getDataSchema();
+    return dataSchema.getColumnNames().length == 0 && dataSchema.getColumnDataTypes().length == 0;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
new file mode 100644
index 0000000000..85c0f108b4
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.request.context.ThreadTimer;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.JoinNode;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.operator.BroadcastJoinOperator;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * WorkerQueryExecutor is the v2 of the {@link org.apache.pinot.core.query.executor.QueryExecutor} API.
+ *
+ * It provides not only execution interface for {@link org.apache.pinot.core.query.request.ServerQueryRequest} but
+ * also a more general {@link DistributedStagePlan}.
+ */
+public class WorkerQueryExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class);
+  private PinotConfiguration _config;
+  private ServerMetrics _serverMetrics;
+  private MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private String _hostName;
+  private int _port;
+
+  public void init(PinotConfiguration config, ServerMetrics serverMetrics,
+      MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port) {
+    _config = config;
+    _serverMetrics = serverMetrics;
+    _mailboxService = mailboxService;
+    _hostName = hostName;
+    _port = port;
+  }
+
+  public synchronized void start() {
+    LOGGER.info("Worker query executor started");
+  }
+
+  public synchronized void shutDown() {
+    LOGGER.info("Worker query executor shut down");
+  }
+
+  // TODO: split this execution from PhysicalPlanner
+  public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
+      ExecutorService executorService) {
+    long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
+    StageNode stageRoot = queryRequest.getStageRoot();
+    BaseOperator<DataTableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
+    executorService.submit(new TraceRunnable() {
+      @Override
+      public void runJob() {
+        ThreadTimer executionThreadTimer = new ThreadTimer();
+        while (!DataTableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+          LOGGER.debug("Result Block acquired");
+        }
+        LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
+      }
+    });
+  }
+
+  // TODO: split this PhysicalPlanner into a separate module
+  private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode,
+      Map<Integer, StageMetadata> metadataMap) {
+    // TODO: optimize this into a framework. (physical planner)
+    if (stageNode instanceof MailboxSendNode) {
+      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
+      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
+      StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
+      return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
+          sendNode.getExchangeType(), _hostName, _port, requestId, sendNode.getStageId());
+    } else if (stageNode instanceof MailboxReceiveNode) {
+      MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
+      List<ServerInstance> sendingInstances = metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
+      return new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, _hostName, _port,
+          requestId, receiveNode.getSenderStageId());
+    } else if (stageNode instanceof JoinNode) {
+      JoinNode joinNode = (JoinNode) stageNode;
+      BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
+      BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
+      return new BroadcastJoinOperator(leftOperator, rightOperator, joinNode.getCriteria());
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java
new file mode 100644
index 0000000000..db5bb5289d
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BroadcastJoinOperator.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.planner.nodes.JoinNode;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+
+
+/**
+ * This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm.
+ *
+ * <p>It takes the right table as the broadcast side and materialize a hash table. Then for each of the left table row,
+ * it looks up for the corresponding row(s) from the hash table and create a joint row.
+ *
+ * <p>For each of the data block received from the left table, it will generate a joint data block.
+ */
+public class BroadcastJoinOperator extends BaseOperator<DataTableBlock> {
+  private static final String OPERATOR_NAME = "BroadcastJoinOperator";
+  private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
+
+  private final HashMap<Object, List<Object[]>> _broadcastHashTable;
+  private final BaseOperator<DataTableBlock> _leftTableOperator;
+  private final BaseOperator<DataTableBlock> _rightTableOperator;
+
+  private DataSchema _leftTableSchema;
+  private DataSchema _rightTableSchema;
+  private int _resultRowSize;
+  private boolean _isHashTableBuilt;
+  private KeySelector<Object[], Object> _leftKeySelector;
+  private KeySelector<Object[], Object> _rightKeySelector;
+
+  public BroadcastJoinOperator(BaseOperator<DataTableBlock> leftTableOperator,
+      BaseOperator<DataTableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) {
+    // TODO: this assumes right table is broadcast.
+    _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
+    _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
+    _leftTableOperator = leftTableOperator;
+    _rightTableOperator = rightTableOperator;
+    _isHashTableBuilt = false;
+    _broadcastHashTable = new HashMap<>();
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected DataTableBlock getNextBlock() {
+    buildBroadcastHashTable();
+    try {
+      return new DataTableBlock(buildJoinedDataTable(_leftTableOperator.nextBlock()));
+    } catch (Exception e) {
+      return DataTableBlockUtils.getErrorDatatableBlock(e);
+    }
+  }
+
+  private void buildBroadcastHashTable() {
+    if (!_isHashTableBuilt) {
+      DataTableBlock rightBlock = _rightTableOperator.nextBlock();
+      while (!DataTableBlockUtils.isEndOfStream(rightBlock)) {
+        DataTable dataTable = rightBlock.getDataTable();
+        _rightTableSchema = dataTable.getDataSchema();
+        int numRows = dataTable.getNumberOfRows();
+        // put all the rows into corresponding hash collections keyed by the key selector function.
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+          List<Object[]> hashCollection =
+              _broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new ArrayList<>());
+          hashCollection.add(objects);
+        }
+        rightBlock = _rightTableOperator.nextBlock();
+      }
+      _isHashTableBuilt = true;
+    }
+  }
+
+  private DataTable buildJoinedDataTable(DataTableBlock block)
+      throws Exception {
+    if (DataTableBlockUtils.isEndOfStream(block)) {
+      return DataTableBlockUtils.getEndOfStreamDataTable();
+    }
+    List<Object[]> rows = new ArrayList<>();
+    DataTable dataTable = block.getDataTable();
+    _leftTableSchema = dataTable.getDataSchema();
+    _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
+    int numRows = dataTable.getNumberOfRows();
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+      List<Object[]> hashCollection =
+          _broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), Collections.emptyList());
+      for (Object[] rightRow : hashCollection) {
+        rows.add(joinRow(leftRow, rightRow));
+      }
+    }
+    return SelectionOperatorUtils.getDataTableFromRows(rows, computeSchema());
+  }
+
+  private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
+    Object[] resultRow = new Object[_resultRowSize];
+    int idx = 0;
+    for (Object obj : leftRow) {
+      resultRow[idx++] = obj;
+    }
+    for (Object obj : rightRow) {
+      resultRow[idx++] = obj;
+    }
+    return resultRow;
+  }
+
+  private DataSchema computeSchema() {
+    String[] columnNames = new String[_resultRowSize];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_resultRowSize];
+    int idx = 0;
+    for (int index = 0; index < _leftTableSchema.size(); index++) {
+      columnNames[idx] = _leftTableSchema.getColumnName(index);
+      columnDataTypes[idx++] = _leftTableSchema.getColumnDataType(index);
+    }
+    for (int index = 0; index < _rightTableSchema.size(); index++) {
+      columnNames[idx] = _rightTableSchema.getColumnName(index);
+      columnDataTypes[idx++] = _rightTableSchema.getColumnDataType(index);
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
new file mode 100644
index 0000000000..d15a9fc1db
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
+ * {@link BaseOperator#getNextBlock()} API.
+ */
+public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
+  private static final String OPERATOR_NAME = "MailboxReceiveOperator";
+  private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
+  private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+
+  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final RelDistribution.Type _exchangeType;
+  private final List<ServerInstance> _sendingStageInstances;
+  private final String _hostName;
+  private final int _port;
+  private final long _jobId;
+  private final int _stageId;
+
+  public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+      RelDistribution.Type exchangeType, List<ServerInstance> sendingStageInstances, String hostName, int port,
+      long jobId, int stageId) {
+    _mailboxService = mailboxService;
+    _exchangeType = exchangeType;
+    _sendingStageInstances = sendingStageInstances;
+    _hostName = hostName;
+    _port = port;
+    _jobId = jobId;
+    _stageId = stageId;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected DataTableBlock getNextBlock() {
+    // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
+    boolean hasOpenedMailbox = true;
+    DataSchema dataSchema = null;
+    long timeoutWatermark = System.nanoTime() + DEFAULT_TIMEOUT_NANO;
+    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
+      hasOpenedMailbox = false;
+      for (ServerInstance sendingInstance : _sendingStageInstances) {
+        try {
+          ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
+              _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
+          // TODO this is not threadsafe.
+          // make sure only one thread is checking receiving mailbox and calling receive() then close()
+          if (!receivingMailbox.isClosed()) {
+            hasOpenedMailbox = true;
+            Mailbox.MailboxContent mailboxContent = receivingMailbox.receive();
+            if (mailboxContent != null) {
+              ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+              if (byteBuffer.hasRemaining()) {
+                DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
+                if (dataTable.getNumberOfRows() > 0) {
+                  // here we only return data table block when it is not empty.
+                  return new DataTableBlock(dataTable);
+                }
+              }
+            }
+          }
+        } catch (Exception e) {
+          LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e);
+        }
+      }
+    }
+    if (System.nanoTime() >= timeoutWatermark) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+    }
+    // TODO: we need to at least return one data table with schema if there's no error.
+    // we need to condition this on whether there's already things being returned or not.
+    return DataTableBlockUtils.getEndOfStreamDataTableBlock();
+  }
+
+  public RelDistribution.Type getExchangeType() {
+    return _exchangeType;
+  }
+
+  private String toMailboxId(ServerInstance serverInstance) {
+    return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), serverInstance.getHostname(),
+        serverInstance.getGrpcPort(), _hostName, _port).toString();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
new file mode 100644
index 0000000000..3971dfb325
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This {@code MailboxSendOperator} is created to send {@link DataTableBlock}s to the receiving end.
+ */
+public class MailboxSendOperator extends BaseOperator<DataTableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+  private static final String OPERATOR_NAME = "MailboxSendOperator";
+  private static final String EXPLAIN_NAME = "MAILBOX_SEND";
+  private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
+      ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
+          RelDistribution.Type.BROADCAST_DISTRIBUTED);
+
+  private final List<ServerInstance> _receivingStageInstances;
+  private final RelDistribution.Type _exchangeType;
+  private final String _serverHostName;
+  private final int _serverPort;
+  private final long _jobId;
+  private final int _stageId;
+  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private BaseOperator<DataTableBlock> _dataTableBlockBaseOperator;
+  private DataTable _dataTable;
+
+  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+      BaseOperator<DataTableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, String hostName, int port, long jobId, int stageId) {
+    _mailboxService = mailboxService;
+    _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
+    _receivingStageInstances = receivingStageInstances;
+    _exchangeType = exchangeType;
+    _serverHostName = hostName;
+    _serverPort = port;
+    _jobId = jobId;
+    _stageId = stageId;
+    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(_exchangeType),
+        String.format("Exchange type '%s' is not supported yet", _exchangeType));
+  }
+
+  /**
+   * This is a temporary interface for connecting with server API. remove/merge with InstanceResponseOperator once
+   * we create a {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle the
+   * creation of MailboxSendOperator we should not use this API.
+   */
+  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataTable dataTable,
+      List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, String hostName, int port,
+      long jobId, int stageId) {
+    _mailboxService = mailboxService;
+    _dataTable = dataTable;
+    _receivingStageInstances = receivingStageInstances;
+    _exchangeType = exchangeType;
+    _serverHostName = hostName;
+    _serverPort = port;
+    _jobId = jobId;
+    _stageId = stageId;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected DataTableBlock getNextBlock() {
+    DataTable dataTable;
+    DataTableBlock dataTableBlock = null;
+    if (_dataTableBlockBaseOperator != null) {
+      dataTableBlock = _dataTableBlockBaseOperator.nextBlock();
+      dataTable = dataTableBlock.getDataTable();
+    } else {
+      dataTable = _dataTable;
+    }
+    boolean isEndOfStream = dataTableBlock == null || DataTableBlockUtils.isEndOfStream(dataTableBlock);
+    try {
+      switch (_exchangeType) {
+        // TODO: random and singleton distribution should've been selected in planning phase.
+        case SINGLETON:
+        case RANDOM_DISTRIBUTED:
+          // TODO: make random distributed actually random, this impl only sends data to the first instances.
+          for (ServerInstance serverInstance : _receivingStageInstances) {
+            sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
+            // we no longer need to send data to the rest of the receiving instances, but we still need to transfer
+            // the dataTable over indicating that we are a potential sender. thus next time a random server is selected
+            // it might still be useful.
+            dataTable = DataTableBlockUtils.getEmptyDataTable(dataTable.getDataSchema());
+          }
+          break;
+        case BROADCAST_DISTRIBUTED:
+          for (ServerInstance serverInstance : _receivingStageInstances) {
+            sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
+          }
+          break;
+        case HASH_DISTRIBUTED:
+        case RANGE_DISTRIBUTED:
+        case ROUND_ROBIN_DISTRIBUTED:
+        case ANY:
+        default:
+          throw new UnsupportedOperationException("Unsupported mailbox exchange type: " + _exchangeType);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Exception occur while sending data via mailbox", e);
+    }
+    return dataTableBlock;
+  }
+
+  private void sendDataTableBlock(ServerInstance serverInstance, DataTable dataTable, boolean isEndOfStream)
+      throws IOException {
+    String mailboxId = toMailboxId(serverInstance);
+    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
+    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, dataTable, isEndOfStream);
+    sendingMailbox.send(mailboxContent);
+    if (mailboxContent.getMetadataMap().containsKey("finished")) {
+      sendingMailbox.complete();
+    }
+  }
+
+  private Mailbox.MailboxContent toMailboxContent(String mailboxId, DataTable dataTable, boolean isEndOfStream)
+      throws IOException {
+    Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
+        .setPayload(ByteString.copyFrom(new DataTableBlock(dataTable).toBytes()));
+    if (isEndOfStream) {
+      builder.putMetadata("finished", "true");
+    }
+    return builder.build();
+  }
+
+  private String toMailboxId(ServerInstance serverInstance) {
+    return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), _serverHostName, _serverPort,
+        serverInstance.getHostname(), serverInstance.getGrpcPort()).toString();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
new file mode 100644
index 0000000000..f9ecf7f089
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.StageNode;
+
+
+/**
+ * {@code DistributedStagePlan} is the deserialized version of the
+ * {@link org.apache.pinot.common.proto.Worker.StagePlan}.
+ *
+ * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
+ */
+public class DistributedStagePlan {
+  private int _stageId;
+  private ServerInstance _serverInstance;
+  private StageNode _stageRoot;
+  private Map<Integer, StageMetadata> _metadataMap;
+
+  public DistributedStagePlan(int stageId) {
+    _stageId = stageId;
+    _metadataMap = new HashMap<>();
+  }
+
+  public DistributedStagePlan(int stageId, ServerInstance serverInstance, StageNode stageRoot,
+      Map<Integer, StageMetadata> metadataMap) {
+    _stageId = stageId;
+    _serverInstance = serverInstance;
+    _stageRoot = stageRoot;
+    _metadataMap = metadataMap;
+  }
+
+  public int getStageId() {
+    return _stageId;
+  }
+
+  public ServerInstance getServerInstance() {
+    return _serverInstance;
+  }
+
+  public StageNode getStageRoot() {
+    return _stageRoot;
+  }
+
+  public Map<Integer, StageMetadata> getMetadataMap() {
+    return _metadataMap;
+  }
+
+  public void setServerInstance(ServerInstance serverInstance) {
+    _serverInstance = serverInstance;
+  }
+
+  public void setStageRoot(StageNode stageRoot) {
+    _stageRoot = stageRoot;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
new file mode 100644
index 0000000000..1b4ecbbdc0
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.serde;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+
+
+/**
+ * This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
+ */
+public class QueryPlanSerDeUtils {
+
+  private QueryPlanSerDeUtils() {
+    // do not instantiate.
+  }
+
+  public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
+    DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
+    distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId()));
+    distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageRoot(stagePlan.getSerializedStageRoot()));
+    Map<Integer, Worker.StageMetadata> metadataMap = stagePlan.getStageMetadataMap();
+    distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
+    return distributedStagePlan;
+  }
+
+  public static Worker.StagePlan serialize(DistributedStagePlan distributedStagePlan) {
+    return Worker.StagePlan.newBuilder()
+        .setStageId(distributedStagePlan.getStageId())
+        .setInstanceId(instanceToString(distributedStagePlan.getServerInstance()))
+        .setSerializedStageRoot(StageNodeSerDeUtils.serializeStageRoot(distributedStagePlan.getStageRoot()))
+        .putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
+  }
+
+  public static ServerInstance stringToInstance(String serverInstanceString) {
+    String[] s = StringUtils.split(serverInstanceString, '_');
+    return new WorkerInstance(s[0], Integer.parseInt(s[1]), Integer.parseInt(s[2]));
+  }
+
+  public static String instanceToString(ServerInstance serverInstance) {
+    return StringUtils.join(serverInstance.getHostname(), '_', serverInstance.getPort(), '_',
+        serverInstance.getGrpcPort());
+  }
+
+  public static Map<Integer, StageMetadata> protoMapToStageMetadataMap(Map<Integer, Worker.StageMetadata> protoMap) {
+    Map<Integer, StageMetadata> metadataMap = new HashMap<>();
+    for (Map.Entry<Integer, Worker.StageMetadata> e : protoMap.entrySet()) {
+      metadataMap.put(e.getKey(), fromWorkerStageMetadata(e.getValue()));
+    }
+    return metadataMap;
+  }
+
+  private static StageMetadata fromWorkerStageMetadata(Worker.StageMetadata workerStageMetadata) {
+    StageMetadata stageMetadata = new StageMetadata();
+    stageMetadata.getScannedTables().addAll(workerStageMetadata.getDataSourcesList());
+    for (String serverInstanceString : workerStageMetadata.getInstancesList()) {
+      stageMetadata.getServerInstances().add(stringToInstance(serverInstanceString));
+    }
+    for (Map.Entry<String, Worker.SegmentList> e : workerStageMetadata.getInstanceToSegmentListMap().entrySet()) {
+      stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(e.getKey()), e.getValue().getSegmentsList());
+    }
+    return stageMetadata;
+  }
+
+  public static Map<Integer, Worker.StageMetadata> stageMetadataMapToProtoMap(Map<Integer, StageMetadata> metadataMap) {
+    Map<Integer, Worker.StageMetadata> protoMap = new HashMap<>();
+    for (Map.Entry<Integer, StageMetadata> e : metadataMap.entrySet()) {
+      protoMap.put(e.getKey(), toWorkerStageMetadata(e.getValue()));
+    }
+    return protoMap;
+  }
+
+  private static Worker.StageMetadata toWorkerStageMetadata(StageMetadata stageMetadata) {
+    Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
+    builder.addAllDataSources(stageMetadata.getScannedTables());
+    for (ServerInstance serverInstance : stageMetadata.getServerInstances()) {
+      builder.addInstances(instanceToString(serverInstance));
+    }
+    for (Map.Entry<ServerInstance, List<String>> e : stageMetadata.getServerInstanceToSegmentsMap().entrySet()) {
+      builder.putInstanceToSegmentList(instanceToString(e.getKey()),
+          Worker.SegmentList.newBuilder().addAllSegments(e.getValue()).build());
+    }
+    return builder.build();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
new file mode 100644
index 0000000000..80370128cf
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.serde;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.pinot.query.planner.nodes.StageNode;
+
+
+public class StageNodeSerDeUtils {
+  private StageNodeSerDeUtils() {
+    // do not instantiate.
+  }
+
+  public static StageNode deserializeStageRoot(ByteString serializeStagePlan) {
+    try (ByteArrayInputStream bs = new ByteArrayInputStream(serializeStagePlan.toByteArray());
+        ObjectInputStream is = new ObjectInputStream(bs)) {
+      Object o = is.readObject();
+      Preconditions.checkState(o instanceof StageNode, "invalid worker query request object");
+      return (StageNode) o;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static ByteString serializeStageRoot(StageNode stageRoot) {
+    try (ByteArrayOutputStream bs = new ByteArrayOutputStream();
+        ObjectOutputStream os = new ObjectOutputStream(bs)) {
+      os.writeObject(stageRoot);
+      return ByteString.copyFrom(bs.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
new file mode 100644
index 0000000000..306faa2598
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.utils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.query.planner.nodes.CalcNode;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+
+
+/**
+ * {@code ServerRequestUtils} converts the {@link DistributedStagePlan} into a {@link ServerQueryRequest}.
+ *
+ * <p>In order to reuse the current pinot {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}, a
+ * conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor.
+ */
+public class ServerRequestUtils {
+
+  private ServerRequestUtils() {
+    // do not instantiate.
+  }
+
+  // TODO: This is a hack, make an actual ServerQueryRequest converter.
+  public static ServerQueryRequest constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
+      Map<String, String> requestMetadataMap) {
+    InstanceRequest instanceRequest = new InstanceRequest();
+    instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
+    instanceRequest.setBrokerId("unknown");
+    instanceRequest.setEnableTrace(false);
+    instanceRequest.setSearchSegments(
+        distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()).getServerInstanceToSegmentsMap()
+            .get(distributedStagePlan.getServerInstance()));
+    instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan));
+    return new ServerQueryRequest(instanceRequest, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
+        System.currentTimeMillis());
+  }
+
+  // TODO: this is a hack, create a broker request object should not be needed because we rewrite the entire
+  // query into stages already.
+  public static BrokerRequest constructBrokerRequest(DistributedStagePlan distributedStagePlan) {
+    PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(pinotQuery);
+    // Set table name in broker request because it is used for access control, query routing etc.
+    DataSource dataSource = pinotQuery.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    return brokerRequest;
+  }
+
+  public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan) {
+    PinotQuery pinotQuery = new PinotQuery();
+    pinotQuery.setExplain(false);
+    walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery);
+    return pinotQuery;
+  }
+
+  private static void walkStageTree(StageNode node, PinotQuery pinotQuery) {
+    if (node instanceof CalcNode) {
+      // TODO: add conversion for CalcNode, specifically filter/alias/...
+    } else if (node instanceof TableScanNode) {
+      TableScanNode tableScanNode = (TableScanNode) node;
+      DataSource dataSource = new DataSource();
+      dataSource.setTableName(tableScanNode.getTableName());
+      pinotQuery.setDataSource(dataSource);
+      pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression)
+          .collect(Collectors.toList()));
+    } else if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode) {
+      // ignore for now. continue to child.
+    } else {
+      throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
+    }
+    for (StageNode child : node.getInputs()) {
+      walkStageTree(child, pinotQuery);
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
similarity index 55%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
copy to pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index 9e0c776fd5..8de432f3ea 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -16,24 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.partitioning;
-
-import java.io.Serializable;
-
+package org.apache.pinot.query.service;
 
 /**
- * The {@code FieldSelectionKeySelector} simply extract a column value out from a row array {@link Object[]}.
+ * Configuration for setting up query runtime.
  */
-public class FieldSelectionKeySelector implements KeySelector<Object[], Object>, Serializable {
+public class QueryConfig {
+  public static final String KEY_OF_QUERY_SERVER_PORT = "pinot.query.server.port";
+  public static final int DEFAULT_QUERY_SERVER_PORT = -1;
 
-  private int _columnIndex;
-
-  public FieldSelectionKeySelector(int columnIndex) {
-    _columnIndex = columnIndex;
-  }
+  public static final String KEY_OF_QUERY_RUNNER_HOSTNAME = "pinot.query.runner.hostname";
+  public static final String DEFAULT_QUERY_RUNNER_HOSTNAME = "localhost";
+  // query runner port is the mailbox port.
+  public static final String KEY_OF_QUERY_RUNNER_PORT = "pinot.query.runner.port";
+  public static final int DEFAULT_QUERY_RUNNER_PORT = -1;
 
-  @Override
-  public Object getKey(Object[] input) {
-    return input[_columnIndex];
+  private QueryConfig() {
+    // do not instantiate.
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
new file mode 100644
index 0000000000..3200b317bb
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service;
+
+import io.grpc.ManagedChannelBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@code QueryDispatcher} dispatch a query to different workers.
+ */
+public class QueryDispatcher {
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class);
+
+  private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap<>();
+
+  public QueryDispatcher() {
+  }
+
+  public List<DataTable> submitAndReduce(long requestId, QueryPlan queryPlan,
+      MailboxService<Mailbox.MailboxContent> mailboxService, long timeoutNano)
+      throws Exception {
+    // submit all the distributed stages.
+    int reduceStageId = submit(requestId, queryPlan);
+
+    // run reduce stage.
+    MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
+    MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService,
+        queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+        requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(),
+        mailboxService.getMailboxPort());
+
+    List<DataTable> queryResults = new ArrayList<>();
+    long timeoutWatermark = System.nanoTime() + timeoutNano;
+    while (System.nanoTime() < timeoutWatermark) {
+      DataTableBlock dataTableBlock = mailboxReceiveOperator.nextBlock();
+      queryResults.add(dataTableBlock.getDataTable());
+      if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) {
+        break;
+      }
+    }
+    return queryResults;
+  }
+
+  public int submit(long requestId, QueryPlan queryPlan)
+      throws Exception {
+    int reduceStageId = -1;
+    for (Map.Entry<Integer, StageMetadata> stage : queryPlan.getStageMetadataMap().entrySet()) {
+      int stageId = stage.getKey();
+      // stage rooting at a mailbox receive node means reduce stage.
+      if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
+        reduceStageId = stageId;
+      } else {
+        List<ServerInstance> serverInstances = stage.getValue().getServerInstances();
+        for (ServerInstance serverInstance : serverInstances) {
+          String host = serverInstance.getHostname();
+          int port = serverInstance.getPort();
+          DispatchClient client = getOrCreateDispatchClient(host, port);
+          Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder()
+              .setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
+                  serverInstance)))
+              .putMetadata("REQUEST_ID", String.valueOf(requestId))
+              .putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname())
+              .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(serverInstance.getGrpcPort())).build());
+          if (response.containsMetadata("ERROR")) {
+            throw new RuntimeException(
+                String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", stageId,
+                    serverInstance, response));
+          }
+        }
+      }
+    }
+    return reduceStageId;
+  }
+
+  protected MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+      List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) {
+    MailboxReceiveOperator mailboxReceiveOperator =
+        new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId,
+            stageId);
+    return mailboxReceiveOperator;
+  }
+
+  public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int stageId,
+      ServerInstance serverInstance) {
+    return new DistributedStagePlan(stageId, serverInstance, queryPlan.getQueryStageMap().get(stageId),
+        queryPlan.getStageMetadataMap());
+  }
+
+  private DispatchClient getOrCreateDispatchClient(String host, int port) {
+    String key = String.format("%s_%d", host, port);
+    return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port));
+  }
+
+  public static class DispatchClient {
+    private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub _blockingStub;
+
+    public DispatchClient(String host, int port) {
+      ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
+      _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(managedChannelBuilder.build());
+    }
+
+    public Worker.QueryResponse submit(Worker.QueryRequest request) {
+      return _blockingStub.submit(request);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
new file mode 100644
index 0000000000..0a74ee820b
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service;
+
+import io.grpc.Context;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}.
+ */
+public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
+
+  private final Server _server;
+  private final QueryRunner _queryRunner;
+  private final ExecutorService _executorService;
+
+  public QueryServer(int port, QueryRunner queryRunner) {
+    _server = ServerBuilder.forPort(port).addService(this).build();
+    _executorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+        new NamedThreadFactory("query_worker_on_" + port + "_port"));
+    _queryRunner = queryRunner;
+    LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port,
+        ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+  }
+
+  public void start() {
+    LOGGER.info("Starting QueryWorker");
+    try {
+      _queryRunner.start();
+      _server.start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void shutdown() {
+    LOGGER.info("Shutting down QueryWorker");
+    try {
+      _queryRunner.shutDown();
+      _server.shutdown().awaitTermination();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
+    // Deserialize the request
+    DistributedStagePlan distributedStagePlan;
+    Map<String, String> requestMetadataMap;
+    try {
+      distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan());
+      requestMetadataMap = request.getMetadataMap();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while deserializing the request: {}", request, e);
+      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
+      return;
+    }
+
+    // return dispatch successful.
+    // TODO: return meaningful value here.
+    responseObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("OK", "OK").build());
+    responseObserver.onCompleted();
+
+    // start a new GRPC ctx has all the values as the current context, but won't be cancelled
+    Context ctx = Context.current().fork();
+    // Set ctx as the current context within the Runnable can start asynchronous work here that will not
+    // be cancelled when submit returns
+    ctx.run(() -> {
+      // Process the query
+      try {
+        // TODO: break this into parsing and execution, so that responseObserver can return upon parsing complete.
+        _queryRunner.processQuery(distributedStagePlan, _executorService, requestMetadataMap);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while processing request", e);
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
new file mode 100644
index 0000000000..0e60a4a533
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Query server enclosure for testing Pinot query planner & runtime.
+ *
+ * This enclosure simulates a deployable component of Pinot that serves
+ *   - regular Pinot query server (that serves segment-based queries)
+ *   - intermediate stage queries (such as JOIN operator that awaits data scanned from left/right tables)
+ *
+ * Inside this construct it runs a regular pinot QueryExecutor as well as the new runtime - WorkerExecutor
+ * Depending on the query request type it gets routed to either one of the two for execution.
+ *
+ * It also runs a GRPC Mailbox service that runs the new transport layer protocol as the backbone for all
+ * multi-stage query communication.
+ */
+public class QueryServerEnclosure {
+  private static final int NUM_ROWS = 5;
+  private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
+  private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"};
+  private static final int[] INT_FIELD_LIST = new int[]{1, 2, 42};
+
+  private final ExecutorService _testExecutor;
+  private final int _queryRunnerPort;
+  private final Map<String, Object> _runnerConfig = new HashMap<>();
+  private final Map<String, List<ImmutableSegment>> _segmentMap = new HashMap<>();
+  private final InstanceDataManager _instanceDataManager;
+  private final Map<String, TableDataManager> _tableDataManagers = new HashMap<>();
+  private final Map<String, File> _indexDirs;
+
+  private QueryRunner _queryRunner;
+
+  public QueryServerEnclosure(List<String> tables, Map<String, File> indexDirs, Map<String, List<String>> segments) {
+    _indexDirs = indexDirs;
+    try {
+      for (int i = 0; i < tables.size(); i++) {
+        String tableName = tables.get(i);
+        File indexDir = indexDirs.get(tableName);
+        FileUtils.deleteQuietly(indexDir);
+        List<ImmutableSegment> segmentList = new ArrayList<>();
+        for (String segmentName : segments.get(tableName)) {
+          segmentList.add(buildSegment(indexDir, tableName, segmentName));
+        }
+        _segmentMap.put(tableName, segmentList);
+      }
+      _instanceDataManager = mockInstanceDataManager();
+      _queryRunnerPort = QueryEnvironmentTestUtils.getAvailablePort();
+      _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
+      _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
+          String.format("Server_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME));
+      _queryRunner = new QueryRunner();
+      _testExecutor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM,
+          new NamedThreadFactory("test_query_server_enclosure_on_" + _queryRunnerPort + "_port"));
+    } catch (Exception e) {
+      throw new RuntimeException("Test Failed!", e);
+    }
+  }
+
+  public ServerMetrics mockServiceMetrics() {
+    return mock(ServerMetrics.class);
+  }
+
+  public InstanceDataManager mockInstanceDataManager() {
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
+      TableDataManager tableDataManager = mockTableDataManager(e.getValue());
+      _tableDataManagers.put(e.getKey(), tableDataManager);
+    }
+    for (Map.Entry<String, TableDataManager> e : _tableDataManagers.entrySet()) {
+      when(instanceDataManager.getTableDataManager(matches(String.format("%s.*", e.getKey())))).thenReturn(
+          e.getValue());
+    }
+    return instanceDataManager;
+  }
+
+  public TableDataManager mockTableDataManager(List<ImmutableSegment> segmentList) {
+    List<SegmentDataManager> tableSegmentDataManagers =
+        segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.acquireSegments(any(), any())).thenReturn(tableSegmentDataManagers);
+    return tableDataManager;
+  }
+
+  public ImmutableSegment buildSegment(File indexDir, String tableName, String segmentName)
+      throws Exception {
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
+      row.putValue("col2", STRING_FIELD_LIST[(i + 2) % STRING_FIELD_LIST.length]);
+      row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
+      row.putValue("ts", System.currentTimeMillis());
+      rows.add(row);
+    }
+
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName("ts").build();
+    Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.setSchemaName(tableName).build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(indexDir.getPath());
+    config.setTableName(tableName);
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+    }
+    driver.build();
+    return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
+  }
+
+  public int getPort() {
+    return _queryRunnerPort;
+  }
+
+  public void start()
+      throws Exception {
+    PinotConfiguration configuration = new PinotConfiguration(_runnerConfig);
+    _queryRunner = new QueryRunner();
+    _queryRunner.init(configuration, _instanceDataManager, mockServiceMetrics());
+    _queryRunner.start();
+  }
+
+  public void shutDown() {
+    _queryRunner.shutDown();
+    for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
+      for (ImmutableSegment segment : e.getValue()) {
+        segment.destroy();
+      }
+      FileUtils.deleteQuietly(_indexDirs.get(e.getKey()));
+    }
+  }
+
+  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    _queryRunner.processQuery(distributedStagePlan, _testExecutor, requestMetadataMap);
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
new file mode 100644
index 0000000000..f1863bc339
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
+
+  @Test
+  public void testHappyPath()
+      throws Exception {
+    Preconditions.checkState(_mailboxServices.size() >= 2);
+    Map.Entry<Integer, GrpcMailboxService> sender = _mailboxServices.firstEntry();
+    Map.Entry<Integer, GrpcMailboxService> receiver = _mailboxServices.lastEntry();
+    String mailboxId =
+        String.format("happyPath:localhost:%d:localhost:%d", sender.getKey(), receiver.getKey());
+    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = sender.getValue().getSendingMailbox(mailboxId);
+    ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox = receiver.getValue().getReceivingMailbox(mailboxId);
+
+    // create mock object
+    Mailbox.MailboxContent testContent = getTestMailboxContent(mailboxId);
+    sendingMailbox.send(testContent);
+
+    // wait for receiving mailbox to be created.
+    TestUtils.waitForCondition(aVoid -> {
+      return receivingMailbox.isInitialized();
+    }, 5000L, "Receiving mailbox initialize failed!");
+
+    Mailbox.MailboxContent receivedContent = receivingMailbox.receive();
+    Assert.assertEquals(receivedContent, testContent);
+
+    sendingMailbox.complete();
+
+    TestUtils.waitForCondition(aVoid -> {
+      return receivingMailbox.isClosed();
+    }, 5000L, "Receiving mailbox is not closed properly!");
+  }
+
+  private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
+      throws IOException {
+    return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
+        .putAllMetadata(ImmutableMap.of("key", "value", "finished", "true"))
+        .setPayload(ByteString.copyFrom(new DataTableBlock(DataTableBuilder.getEmptyDataTable()).toBytes())).build();
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
new file mode 100644
index 0000000000..8a5badd36a
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.TreeMap;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+public abstract class GrpcMailboxServiceTestBase {
+  protected static final int MAILBOX_TEST_SIZE = 2;
+  protected TreeMap<Integer, GrpcMailboxService> _mailboxServices = new TreeMap<>();
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    for (int i = 0; i < MAILBOX_TEST_SIZE; i++) {
+      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      GrpcMailboxService grpcMailboxService = new GrpcMailboxService("localhost", availablePort);
+      grpcMailboxService.start();
+      _mailboxServices.put(availablePort, grpcMailboxService);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (GrpcMailboxService service : _mailboxServices.values()) {
+      service.shutdown();
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
new file mode 100644
index 0000000000..9de2db834f
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.runtime.blocks.DataTableBlock;
+import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.service.QueryDispatcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
+
+
+public class QueryRunnerTest {
+  private static final Random RANDOM_REQUEST_ID_GEN = new Random();
+  private static final File INDEX_DIR_S1_A = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
+  private static final File INDEX_DIR_S1_B = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
+  private static final File INDEX_DIR_S2_A = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
+  private static final File INDEX_DIR_S1_C = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
+  private static final File INDEX_DIR_S2_C = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
+
+  private QueryEnvironment _queryEnvironment;
+  private String _reducerHostname;
+  private int _reducerGrpcPort;
+  private Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>();
+  private GrpcMailboxService _mailboxService;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    QueryServerEnclosure server1 = new QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
+        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", INDEX_DIR_S1_C),
+        QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
+    QueryServerEnclosure server2 = new QueryServerEnclosure(Lists.newArrayList("a", "c"),
+        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C), QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+
+    _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
+    _reducerHostname = String.format("Broker_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
+    Map<String, Object> reducerConfig = new HashMap<>();
+    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
+    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname);
+    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort);
+    _mailboxService.start();
+
+    _queryEnvironment =
+        QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort());
+    server1.start();
+    server2.start();
+    // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
+    // this is only use for test identifier purpose.
+    _servers.put(new WorkerInstance("localhost", server1.getPort(), server1.getPort()), server1);
+    _servers.put(new WorkerInstance("localhost", server2.getPort(), server2.getPort()), server2);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServerEnclosure server : _servers.values()) {
+      server.shutDown();
+    }
+    _mailboxService.shutdown();
+  }
+
+  @Test
+  public void testRunningTableScanOnlyQuery()
+      throws Exception {
+    QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM b");
+    int stageRoodId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1);
+    Map<String, String> requestMetadataMap =
+        ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+
+    ServerInstance serverInstance = queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances().get(0);
+    DistributedStagePlan distributedStagePlan =
+        QueryDispatcher.constructDistributedStagePlan(queryPlan, stageRoodId, serverInstance);
+
+    MailboxReceiveOperator mailboxReceiveOperator =
+        createReduceStageOperator(queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances(),
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), stageRoodId, _reducerGrpcPort);
+
+    // execute this single stage.
+    _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap);
+
+    DataTableBlock dataTableBlock;
+    // get the block back and it should have 5 rows
+    dataTableBlock = mailboxReceiveOperator.nextBlock();
+    Assert.assertEquals(dataTableBlock.getDataTable().getNumberOfRows(), 5);
+    // next block should be null as all servers finished sending.
+    dataTableBlock = mailboxReceiveOperator.nextBlock();
+    Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock));
+  }
+
+  @Test
+  public void testRunningTableScanMultipleServer()
+      throws Exception {
+    QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a");
+    int stageRoodId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 2);
+    Map<String, String> requestMetadataMap =
+        ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+
+    for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances()) {
+      DistributedStagePlan distributedStagePlan =
+          QueryDispatcher.constructDistributedStagePlan(queryPlan, stageRoodId, serverInstance);
+
+      // execute this single stage.
+      _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap);
+    }
+
+    MailboxReceiveOperator mailboxReceiveOperator =
+        createReduceStageOperator(queryPlan.getStageMetadataMap().get(stageRoodId).getServerInstances(),
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), stageRoodId, _reducerGrpcPort);
+
+    int count = 0;
+    int rowCount = 0;
+    DataTableBlock dataTableBlock;
+    while (count < 2) { // we have 2 servers sending data.
+      dataTableBlock = mailboxReceiveOperator.nextBlock();
+      rowCount += dataTableBlock.getDataTable().getNumberOfRows();
+      count++;
+    }
+    // assert that all table A segments returned successfully.
+    Assert.assertEquals(rowCount, 15);
+    // assert that the next block is null (e.g. finished receiving).
+    dataTableBlock = mailboxReceiveOperator.nextBlock();
+    Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock));
+  }
+
+  @Test
+  public void testJoin()
+      throws Exception {
+    QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b on a.col1 = b.col2");
+    Map<String, String> requestMetadataMap =
+        ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+    MailboxReceiveOperator mailboxReceiveOperator = null;
+    for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+      if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
+        MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId);
+        mailboxReceiveOperator = createReduceStageOperator(
+            queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort);
+      } else {
+        for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+          DistributedStagePlan distributedStagePlan =
+              QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance);
+          _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap);
+        }
+      }
+    }
+    Preconditions.checkNotNull(mailboxReceiveOperator);
+
+    int count = 0;
+    int rowCount = 0;
+    List<Object[]> resultRows = new ArrayList<>();
+    DataTableBlock dataTableBlock;
+    while (count < 2) { // we have 2 servers sending data.
+      dataTableBlock = mailboxReceiveOperator.nextBlock();
+      if (dataTableBlock.getDataTable() != null) {
+        DataTable dataTable = dataTableBlock.getDataTable();
+        int numRows = dataTable.getNumberOfRows();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          resultRows.add(extractRowFromDataTable(dataTable, rowId));
+        }
+        rowCount += numRows;
+      }
+      count++;
+    }
+
+    // Assert that each of the 5 categories from left table is joined with right table.
+    // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
+    // thus the final JOIN result will be 15 x 1 = 15.
+    Assert.assertEquals(rowCount, 15);
+
+    // assert that the next block is null (e.g. finished receiving).
+    dataTableBlock = mailboxReceiveOperator.nextBlock();
+    Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock));
+  }
+
+  @Test
+  public void testMultipleJoin()
+      throws Exception {
+    QueryPlan queryPlan =
+        _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2 " + "JOIN c ON a.col3 = c.col3");
+    Map<String, String> requestMetadataMap =
+        ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+    MailboxReceiveOperator mailboxReceiveOperator = null;
+    for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+      if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
+        MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId);
+        mailboxReceiveOperator = createReduceStageOperator(
+            queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort);
+      } else {
+        for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+          DistributedStagePlan distributedStagePlan =
+              QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance);
+          _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap);
+        }
+      }
+    }
+    Preconditions.checkNotNull(mailboxReceiveOperator);
+
+    int count = 0;
+    int rowCount = 0;
+    List<Object[]> resultRows = new ArrayList<>();
+    DataTableBlock dataTableBlock;
+    while (count < 2) { // we have 2 servers sending data.
+      dataTableBlock = mailboxReceiveOperator.nextBlock();
+      if (dataTableBlock.getDataTable() != null) {
+        DataTable dataTable = dataTableBlock.getDataTable();
+        int numRows = dataTable.getNumberOfRows();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          resultRows.add(extractRowFromDataTable(dataTable, rowId));
+        }
+        rowCount += numRows;
+      }
+      count++;
+    }
+
+    // Assert that each of the 5 categories from left table is joined with right table.
+    // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
+    // thus the final JOIN result will be 15 x 1 = 15.
+    // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row of
+    // the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between
+    // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result count will be 36 + 36 + 9 = 81
+    Assert.assertEquals(rowCount, 81);
+
+    // assert that the next block is null (e.g. finished receiving).
+    dataTableBlock = mailboxReceiveOperator.nextBlock();
+    Assert.assertTrue(DataTableBlockUtils.isEndOfStream(dataTableBlock));
+  }
+
+  protected MailboxReceiveOperator createReduceStageOperator(List<ServerInstance> sendingInstances, long jobId,
+      int stageId, int port) {
+    MailboxReceiveOperator mailboxReceiveOperator =
+        new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, "localhost", port,
+            jobId, stageId);
+    return mailboxReceiveOperator;
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
new file mode 100644
index 0000000000..539d5d9a53
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service;
+
+import com.google.common.collect.Lists;
+import io.grpc.ManagedChannelBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class QueryServerTest {
+  private static final int QUERY_SERVER_COUNT = 2;
+  private Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+  private Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>();
+
+  private QueryEnvironment _queryEnvironment;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+
+    for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
+      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      QueryServer queryServer = new QueryServer(availablePort, Mockito.mock(QueryRunner.class));
+      queryServer.start();
+      _queryServerMap.put(availablePort, queryServer);
+      // this only test the QueryServer functionality so the server port can be the same as the mailbox port.
+      // this is only use for test identifier purpose.
+      _queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort));
+    }
+
+    List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
+
+    // reducer port doesn't matter, we are testing the worker instance not GRPC.
+    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServer worker : _queryServerMap.values()) {
+      worker.shutdown();
+    }
+  }
+
+  @Test
+  public void testWorkerAcceptsWorkerRequestCorrect()
+      throws Exception {
+    QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2");
+
+    int singleServerStageId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1);
+
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, singleServerStageId);
+
+    // submit the request for testing.
+    submitRequest(queryRequest);
+  }
+
+  private void submitRequest(Worker.QueryRequest queryRequest) {
+    String host = queryRequest.getMetadataMap().get("SERVER_INSTANCE_HOST");
+    int port = Integer.parseInt(queryRequest.getMetadataMap().get("SERVER_INSTANCE_PORT"));
+    PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub =
+        PinotQueryWorkerGrpc.newBlockingStub(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build());
+    Worker.QueryResponse resp = stub.submit(queryRequest);
+    // TODO: validate meaningful return value
+    Assert.assertNotNull(resp.getMetadataMap().get("OK"));
+  }
+
+  private Worker.QueryRequest getQueryRequest(QueryPlan queryPlan, int stageId) {
+    ServerInstance serverInstance = queryPlan.getStageMetadataMap().get(stageId).getServerInstances().get(0);
+
+    return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
+            QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance)))
+        .putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname())
+        .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(serverInstance.getPort())).build();
+  }
+}
diff --git a/pom.xml b/pom.xml
index d8a42ccfd3..dc78d0f67b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
     <module>pinot-compatibility-verifier</module>
 
     <module>pinot-query-planner</module>
+    <module>pinot-query-runtime</module>
   </modules>
 
   <licenses>
@@ -413,6 +414,24 @@
         <version>${project.version}</version>
         <type>test-jar</type>
       </dependency>
+
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-query-planner</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-query-planner</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-query-runtime</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>nl.jqno.equalsverifier</groupId>
         <artifactId>equalsverifier</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org