You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/01 05:16:44 UTC

[pinot] branch master updated: [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 714ea4ed64 [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)
714ea4ed64 is described below

commit 714ea4ed645407d0e3b9509bc702fa5967b0046a
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed May 31 22:16:35 2023 -0700

    [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)
    
    Pipeline breaker
    ---
    1. Introduce `PipelineBreaker` package
        - `PipelineBreaker` runs before the actual `OpChain` of a stage (PlanFragment + Stage/Worker metadata), it would compute/store all pending "sub-plan" results before the actual `OpChain` of a stage is **CONSTRUCTED**
        - `PipelineBreaker` is not another `OpChain` the actual computation happens elsewhere.
        - currently `PipelineBreaker` only supports MailboxReceive
    2. Introduce compilation support for accepting data result placeholder in `PhysicalPlanVisitor` (and `ServerPlanRequestVisitor`)
        - this way `OpChain` will be generated with the results of the PipelineBreaker as contextual input.
    3. Also cleaned up the scheduler
        - no need for 2 identical schedulers as they are all cached (leaf vs. intermediate)
        - pipeline executor is now a static method, so it is easily plugged into different executor services
    
    Dynamic broadcast SEMI JOIN
    ---
    1. Created dynamic broadcast runtime based on #10772
    2. Modified physical plan visitor to support semi join using dynamic broadcast
    
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/mailbox/MailboxIdUtils.java |   8 +-
 .../apache/pinot/query/runtime/QueryRunner.java    | 114 +++++----
 .../pinot/query/runtime/operator/OpChainId.java    |   4 +
 .../query/runtime/plan/DistributedStagePlan.java   |   8 +
 .../runtime/plan/OpChainExecutionContext.java      |   8 +-
 ...equestContext.java => PhysicalPlanContext.java} |  14 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  30 +--
 .../plan/pipeline/PipelineBreakerContext.java      |  62 +++++
 .../plan/pipeline/PipelineBreakerExecutor.java     | 101 ++++++++
 .../plan/pipeline/PipelineBreakerOperator.java     | 118 +++++++++
 .../plan/pipeline/PipelineBreakerResult.java       |  43 ++++
 .../plan/pipeline/PipelineBreakerVisitor.java      |  50 ++++
 .../plan/server/ServerPlanRequestContext.java      |  11 +-
 .../ServerPlanRequestUtils.java}                   | 264 ++++++++++-----------
 .../plan/server/ServerPlanRequestVisitor.java      | 191 +++++++++++++++
 .../apache/pinot/query/QueryServerEnclosure.java   |  15 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   3 +
 .../runtime/operator/LiteralValueOperatorTest.java |   4 +-
 .../pinot/query/service/QueryServerTest.java       |   4 -
 .../service/dispatch/QueryDispatcherTest.java      |   3 -
 .../src/test/resources/queries/QueryHints.json     |  20 ++
 21 files changed, 846 insertions(+), 229 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
index f65de98118..02944c665a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
@@ -45,8 +45,12 @@ public class MailboxIdUtils {
     return new OpChainId(Long.parseLong(parts[0]), Integer.parseInt(parts[4]), Integer.parseInt(parts[3]));
   }
 
-  public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadatas) {
-    return senderMailBoxMetadatas.getMailBoxIdList().stream()
+  public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadata) {
+    return toMailboxIds(requestId, senderMailBoxMetadata.getMailBoxIdList());
+  }
+
+  public static List<String> toMailboxIds(long requestId, List<String> mailboxMetadataIdList) {
+    return mailboxMetadataIdList.stream()
         .map(mailboxIdFromBroker -> Long.toString(requestId) + SEPARATOR + mailboxIdFromBroker)
         .collect(Collectors.toList());
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 94640ffae7..6fa02a216a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
@@ -52,11 +51,13 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
-import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -83,14 +84,11 @@ public class QueryRunner {
   private MailboxService _mailboxService;
   private String _hostname;
   private int _port;
-  private VirtualServerAddress _rootServer;
 
   private ExecutorService _queryWorkerIntermExecutorService;
   private ExecutorService _queryWorkerLeafExecutorService;
-  private ExecutorService _queryRunnerExecutorService;
 
-  private OpChainSchedulerService _intermScheduler;
-  private OpChainSchedulerService _leafScheduler;
+  private OpChainSchedulerService _scheduler;
 
   /**
    * Initializes the query executor.
@@ -102,8 +100,6 @@ public class QueryRunner {
     _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
-    // always use 0 for root server ID as all data is processed by one node at the global root
-    _rootServer = new VirtualServerAddress(_hostname, _port, 0);
     _helixManager = helixManager;
     try {
       long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
@@ -112,12 +108,9 @@ public class QueryRunner {
           new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port"));
       _queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
-      _queryRunnerExecutorService = Executors.newCachedThreadPool(
-          new NamedThreadFactory("query_runner_on_" + _port + "_port"));
-      _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
+      _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
           getQueryWorkerIntermExecutorService());
-      _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService());
-      _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
+      _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
     } catch (Exception e) {
@@ -130,39 +123,70 @@ public class QueryRunner {
     _helixPropertyStore = _helixManager.getHelixPropertyStore();
     _mailboxService.start();
     _serverExecutor.start();
-    _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
-    _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
+    _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
   }
 
   public void shutDown()
       throws TimeoutException {
     _serverExecutor.shutDown();
     _mailboxService.shutdown();
-    _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
-    _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
+    _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
   }
 
-  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+  /**
+   * Execute a {@link DistributedStagePlan}.
+   *
+   * <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
+   * for results/exceptions.</p>
+   */
+  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap)
+      throws Exception {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
-    if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
-      _leafScheduler.register(rootOperator);
+
+    // run pre-stage execution for all pipeline breakers
+    PipelineBreakerResult pipelineBreakerResult;
+    try {
+      pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
+          distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled);
+    } catch (Exception e) {
+      LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
+          distributedStagePlan.getStageId(), e);
+      _scheduler.cancel(requestId);
+      throw e;
+    }
+
+    // run OpChain
+    if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
+      try {
+        OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
+            timeoutMs, deadlineMs, requestId);
+        _scheduler.register(rootOperator);
+      } catch (Exception e) {
+        LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
+        _scheduler.cancel(requestId);
+        throw e;
+      }
     } else {
-      PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
-          new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
-      _intermScheduler.register(rootOperator);
+      try {
+        PlanNode stageRoot = distributedStagePlan.getStageRoot();
+        OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
+            new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+                distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
+        _scheduler.register(rootOperator);
+      } catch (Exception e) {
+        LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
+        _scheduler.cancel(requestId);
+        throw e;
+      }
     }
   }
 
   public void cancel(long requestId) {
-    _intermScheduler.cancel(requestId);
+    _scheduler.cancel(requestId);
   }
 
   @VisibleForTesting
@@ -175,17 +199,13 @@ public class QueryRunner {
     return _queryWorkerIntermExecutorService;
   }
 
-  public ExecutorService getQueryRunnerExecutorService() {
-    return _queryRunnerExecutorService;
-  }
-
   private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
-      long timeoutMs, long deadlineMs, long requestId) {
+      PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     List<ServerPlanRequestContext> serverPlanRequestContexts =
-        constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService,
-            deadlineMs);
+        constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
+            _helixPropertyStore, _mailboxService, deadlineMs);
     List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
     for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
       serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
@@ -207,8 +227,8 @@ public class QueryRunner {
   }
 
   private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
-      Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore,
-      MailboxService mailboxService, long deadlineMs) {
+      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
+      ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) {
     StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
     WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
     String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -224,17 +244,17 @@ public class QueryRunner {
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE,
-            tableEntry.getValue(), deadlineMs));
+        requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
+            pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
+            TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME,
-            tableEntry.getValue(), deadlineMs));
+        requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
+            pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
+            TableType.REALTIME, tableEntry.getValue(), deadlineMs));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + tableType);
       }
@@ -254,10 +274,4 @@ public class QueryRunner {
     }
     return result;
   }
-
-  private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
-    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
-    Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
-    return segments != null && segments.size() > 0;
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
index 50589cfca5..dae9c2f755 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
@@ -36,6 +36,10 @@ public class OpChainId {
     return _requestId;
   }
 
+  public int getVirtualServerId() {
+    return _virtualServerId;
+  }
+
   @Override
   public String toString() {
     return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index f8a5f5118b..2aa269e6aa 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
@@ -78,4 +80,10 @@ public class DistributedStagePlan {
   public WorkerMetadata getCurrentWorkerMetadata() {
     return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
   }
+
+  public static boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
+    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
+    Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
+    return segments != null && segments.size() > 0;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index e16795891b..5da20c4e70 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -57,10 +57,10 @@ public class OpChainExecutionContext {
     _traceEnabled = traceEnabled;
   }
 
-  public OpChainExecutionContext(PlanRequestContext planRequestContext) {
-    this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(),
-        planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(),
-        planRequestContext.getStageMetadata(), planRequestContext.isTraceEnabled());
+  public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
+    this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
+        physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(),
+        physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
   }
 
   public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
similarity index 82%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
index d0233c1578..2da5772930 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
@@ -22,9 +22,10 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 
 
-public class PlanRequestContext {
+public class PhysicalPlanContext {
   protected final MailboxService _mailboxService;
   protected final long _requestId;
   protected final int _stageId;
@@ -33,12 +34,14 @@ public class PlanRequestContext {
   private final long _deadlineMs;
   protected final VirtualServerAddress _server;
   protected final StageMetadata _stageMetadata;
+  protected final PipelineBreakerResult _pipelineBreakerResult;
   protected final List<String> _receivingMailboxIds = new ArrayList<>();
   private final OpChainExecutionContext _opChainExecutionContext;
   private final boolean _traceEnabled;
 
-  public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs,
-      VirtualServerAddress server, StageMetadata stageMetadata, boolean traceEnabled) {
+  public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
+      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
+      PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
@@ -46,6 +49,7 @@ public class PlanRequestContext {
     _deadlineMs = deadlineMs;
     _server = server;
     _stageMetadata = stageMetadata;
+    _pipelineBreakerResult = pipelineBreakerResult;
     _traceEnabled = traceEnabled;
     _opChainExecutionContext = new OpChainExecutionContext(this);
   }
@@ -74,6 +78,10 @@ public class PlanRequestContext {
     return _stageMetadata;
   }
 
+  public PipelineBreakerResult getPipelineBreakerResult() {
+    return _pipelineBreakerResult;
+  }
+
   public MailboxService getMailboxService() {
     return _mailboxService;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 8b98fa517a..68fcd82f67 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -56,19 +56,19 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
  * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #build(PlanNode, PlanRequestContext)}
+ * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, PhysicalPlanContext)}
  */
-public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PlanRequestContext> {
+public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> {
 
   private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
 
-  public static OpChain build(PlanNode node, PlanRequestContext context) {
+  public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext context) {
     MultiStageOperator root = node.visit(INSTANCE, context);
     return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds());
   }
 
   @Override
-  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
+  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) {
     if (node.isSortOnReceiver()) {
       SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
           new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
@@ -86,7 +86,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) {
+  public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
         node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
@@ -94,14 +94,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) {
+  public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
         node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public MultiStageOperator visitWindow(WindowNode node, PlanRequestContext context) {
+  public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new WindowAggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getGroupSet(),
         node.getOrderSet(), node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(),
@@ -110,7 +110,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
+  public MultiStageOperator visitSetOp(SetOpNode setOpNode, PhysicalPlanContext context) {
     List<MultiStageOperator> inputs = new ArrayList<>();
     for (PlanNode input : setOpNode.getInputs()) {
       MultiStageOperator visited = input.visit(this, context);
@@ -132,19 +132,19 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PlanRequestContext context) {
+  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PhysicalPlanContext context) {
     throw new UnsupportedOperationException("ExchangeNode should not be visited");
   }
 
   @Override
-  public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) {
+  public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new FilterOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
         node.getCondition());
   }
 
   @Override
-  public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) {
+  public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext context) {
     PlanNode left = node.getInputs().get(0);
     PlanNode right = node.getInputs().get(1);
 
@@ -156,14 +156,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) {
+  public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new TransformOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
         node.getProjects(), node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
+  public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
     return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
@@ -172,12 +172,12 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   }
 
   @Override
-  public MultiStageOperator visitTableScan(TableScanNode node, PlanRequestContext context) {
+  public MultiStageOperator visitTableScan(TableScanNode node, PhysicalPlanContext context) {
     throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!");
   }
 
   @Override
-  public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) {
+  public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext context) {
     return new LiteralValueOperator(context.getOpChainExecutionContext(), node.getDataSchema(), node.getLiteralRows());
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
new file mode 100644
index 0000000000..110c8e22e5
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+
+
+/**
+ * This class used to record the pipeline breaker operator that needs to be run before the main opChain.
+ */
+public class PipelineBreakerContext {
+  private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
+  private final Map<PlanNode, Integer> _planNodeObjectToIdMap = new HashMap<>();
+
+  private final boolean _isLeafStage;
+  private int _currentNodeId = 0;
+
+  public PipelineBreakerContext(boolean isLeafStage) {
+    _isLeafStage = isLeafStage;
+  }
+
+  public void addPipelineBreaker(MailboxReceiveNode mailboxReceiveNode) {
+    int nodeId = _planNodeObjectToIdMap.get(mailboxReceiveNode);
+    _pipelineBreakerMap.put(nodeId, mailboxReceiveNode);
+  }
+
+  public void visitedNewPlanNode(PlanNode planNode) {
+    _planNodeObjectToIdMap.put(planNode, _currentNodeId);
+    _currentNodeId++;
+  }
+
+  public Map<PlanNode, Integer> getNodeIdMap() {
+    return _planNodeObjectToIdMap;
+  }
+
+  public Map<Integer, PlanNode> getPipelineBreakerMap() {
+    return _pipelineBreakerMap;
+  }
+
+  public boolean isLeafStage() {
+    return _isLeafStage;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
new file mode 100644
index 0000000000..b000d8ee74
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+
+
+/**
+ * Utility class to run pipeline breaker execution and collects the results.
+ */
+public class PipelineBreakerExecutor {
+  private PipelineBreakerExecutor() {
+    // do not instantiate.
+  }
+
+  /**
+   * Execute a pipeline breaker and collect the results (synchronously)
+   *
+   * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
+   */
+  public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
+      MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs,
+      long requestId, boolean isTraceEnabled)
+      throws Exception {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
+        DistributedStagePlan.isLeafStage(distributedStagePlan));
+    PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+    if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+      PlanNode stageRoot = distributedStagePlan.getStageRoot();
+      // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage OpChain
+      //     receive-mail callbacks.
+      // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
+      PhysicalPlanContext physicalPlanContext =
+          new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+      Map<Integer, List<TransferableBlock>> resultMap =
+          PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
+      return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+    } else {
+      return null;
+    }
+  }
+
+  private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,
+      PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
+      throws Exception {
+    Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>();
+    for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) {
+      int key = e.getKey();
+      PlanNode planNode = e.getValue();
+      // TODO: supprot other pipeline breaker node type as well.
+      if (!(planNode instanceof MailboxReceiveNode)) {
+        throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now");
+      }
+      OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext);
+      pipelineWorkerMap.put(key, tempOpChain.getRoot());
+    }
+    return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext);
+  }
+
+  private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker(
+      OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
+      PhysicalPlanContext physicalPlanContext)
+      throws Exception {
+    PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator(
+        physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap);
+    OpChain pipelineBreakerOpChain = new OpChain(physicalPlanContext.getOpChainExecutionContext(),
+        pipelineBreakerOperator,
+        physicalPlanContext.getReceivingMailboxIds());
+    scheduler.register(pipelineBreakerOpChain);
+    return pipelineBreakerOperator.getResult();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
new file mode 100644
index 0000000000..61bc9c03b9
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class PipelineBreakerOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
+  private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
+  private final Map<Integer, List<TransferableBlock>> _resultMap;
+  private final CountDownLatch _workerDoneLatch;
+  private TransferableBlock _errorBlock;
+
+
+  public PipelineBreakerOperator(OpChainExecutionContext context,
+      Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+    super(context);
+    _resultMap = new HashMap<>();
+    _workerEntries = new ArrayDeque<>();
+    _workerEntries.addAll(pipelineWorkerMap.entrySet());
+    _workerDoneLatch = new CountDownLatch(1);
+  }
+
+  public Map<Integer, List<TransferableBlock>> getResult()
+      throws Exception {
+    boolean isWorkerDone =
+        _workerDoneLatch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    if (isWorkerDone && _errorBlock == null) {
+      return _resultMap;
+    } else {
+      throw new RuntimeException("Unable to construct pipeline breaker results due to timeout.");
+    }
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    if (System.currentTimeMillis() > _context.getDeadlineMs()) {
+      _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+      _workerDoneLatch.countDown();
+      return _errorBlock;
+    }
+
+    // Poll from every mailbox operator in round-robin fashion:
+    // - Return the first content block
+    // - If no content block found but there are mailboxes not finished, return no-op block
+    // - If all content blocks are already returned, return end-of-stream block
+    int numWorkers = _workerEntries.size();
+    for (int i = 0; i < numWorkers; i++) {
+      Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.remove();
+      TransferableBlock block = worker.getValue().nextBlock();
+
+      // Release the mailbox when the block is end-of-stream
+      if (block != null && block.isSuccessfulEndOfStreamBlock()) {
+        continue;
+      }
+
+      // Add the worker back to the queue if the block is not end-of-stream
+      _workerEntries.add(worker);
+      if (block != null) {
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          _workerDoneLatch.countDown();
+          return _errorBlock;
+        }
+        List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>());
+        // TODO: only data block is handled, we also need to handle metadata block from upstream in the future.
+        if (!block.isEndOfStreamBlock()) {
+          blockList.add(block);
+        }
+      }
+    }
+
+    if (_workerEntries.isEmpty()) {
+      // NOTIFY results are ready.
+      _workerDoneLatch.countDown();
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    } else {
+      return TransferableBlockUtils.getNoOpTransferableBlock();
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
new file mode 100644
index 0000000000..7ebd1d0c8d
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class PipelineBreakerResult {
+  private final Map<PlanNode, Integer> _nodeIdMap;
+  private final Map<Integer, List<TransferableBlock>> _resultMap;
+
+  public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) {
+    _nodeIdMap = nodeIdMap;
+    _resultMap = resultMap;
+  }
+
+  public Map<PlanNode, Integer> getNodeIdMap() {
+    return _nodeIdMap;
+  }
+
+  public Map<Integer, List<TransferableBlock>> getResultMap() {
+    return _resultMap;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
new file mode 100644
index 0000000000..03ea0b2115
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+
+
+public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
+  private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor();
+
+  public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) {
+    root.visit(PipelineBreakerVisitor.INSTANCE, context);
+  }
+
+  @Override
+  public Void process(PlanNode planNode, PipelineBreakerContext context) {
+    context.visitedNewPlanNode(planNode);
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) {
+    process(node, context);
+    // TODO: actually implement pipeline breaker attribute in PlanNode
+    // currently all mailbox receive node from leaf stage is considered as pipeline breaker node.
+    if (context.isLeafStage()) {
+      context.addPipelineBreaker(node);
+    }
+    return null;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 87bf3302b5..5c7bc12503 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -23,8 +23,9 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.spi.config.table.TableType;
 
 
@@ -32,7 +33,7 @@ import org.apache.pinot.spi.config.table.TableType;
  * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
  * {@link PinotQuery} to execute on server.
  */
-public class ServerPlanRequestContext extends PlanRequestContext {
+public class ServerPlanRequestContext extends PhysicalPlanContext {
   protected TableType _tableType;
   protected TimeBoundaryInfo _timeBoundaryInfo;
 
@@ -40,9 +41,11 @@ public class ServerPlanRequestContext extends PlanRequestContext {
   protected InstanceRequest _instanceRequest;
 
   public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
-      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, PinotQuery pinotQuery,
+      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
+      PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery,
       TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
-    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, traceEnabled);
+    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult,
+        traceEnabled);
     _pinotQuery = pinotQuery;
     _tableType = tableType;
     _timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
similarity index 57%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index a4bf0450ea..53846d257f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -16,48 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.runtime.plan.server;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.plannode.AggregateNode;
-import org.apache.pinot.query.planner.plannode.ExchangeNode;
-import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
-import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.SortNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-import org.apache.pinot.query.planner.plannode.ValueNode;
-import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
@@ -67,18 +56,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * Plan visitor for direct leaf-stage server request.
- *
- * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
- * to directly produce operator chain.
- *
- * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
- * filtering and other auxiliary functionalities.
- */
-public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+public class ServerPlanRequestUtils {
   private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
-  private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class);
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
       ImmutableList.of(PredicateComparisonRewriter.class.getName(),
           NonAggregationGroupByToDistinctQueryRewriter.class.getName());
@@ -86,11 +66,14 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
       new ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
 
-  private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor();
+  private ServerPlanRequestUtils() {
+    // do not instantiate.
+  }
 
   public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan,
-      Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
-      TableType tableType, List<String> segmentList, long deadlineMs) {
+      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
+      TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
+      List<String> segmentList, long deadlineMs) {
     // Before-visit: construct the ServerPlanRequestContext baseline
     // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
     long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + (
@@ -108,10 +91,11 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
     pinotQuery.setExplain(false);
     ServerPlanRequestContext context =
         new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
-            stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo, traceEnabled);
+            stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType,
+            timeBoundaryInfo, traceEnabled);
 
     // visit the plan and create query physical plan.
-    ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context);
 
     // Post-visit: finalize context.
     // 1. global rewrite/optimize
@@ -148,6 +132,9 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
     return context;
   }
 
+  /**
+   * Helper method to update query options.
+   */
   private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, String> requestMetadataMap, long timeoutMs,
       boolean traceEnabled) {
     Map<String, String> queryOptions = new HashMap<>();
@@ -161,113 +148,6 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
     pinotQuery.setQueryOptions(queryOptions);
   }
 
-  private static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
-    node.visit(INSTANCE, context);
-  }
-
-  @Override
-  public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    // set group-by list
-    context.getPinotQuery()
-        .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery()));
-    // set agg list
-    context.getPinotQuery().setSelectList(
-        CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(),
-            context.getPinotQuery()));
-    return null;
-  }
-
-  @Override
-  public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
-    throw new UnsupportedOperationException("Window not yet supported!");
-  }
-
-  @Override
-  public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
-  }
-
-  @Override
-  public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) {
-    throw new UnsupportedOperationException("Exchange not yet supported!");
-  }
-
-  @Override
-  public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    context.getPinotQuery()
-        .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery()));
-    return null;
-  }
-
-  @Override
-  public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
-  }
-
-  @Override
-  public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
-  }
-
-  @Override
-  public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
-  }
-
-  @Override
-  public Void visitProject(ProjectNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    context.getPinotQuery()
-        .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery()));
-    return null;
-  }
-
-  @Override
-  public Void visitSort(SortNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    PinotQuery pinotQuery = context.getPinotQuery();
-    if (node.getCollationKeys().size() > 0) {
-      pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
-    }
-    if (node.getFetch() > 0) {
-      pinotQuery.setLimit(node.getFetch());
-    }
-    if (node.getOffset() > 0) {
-      pinotQuery.setOffset(node.getOffset());
-    }
-    return null;
-  }
-
-  @Override
-  public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) {
-    DataSource dataSource = new DataSource();
-    String tableNameWithType = TableNameBuilder.forType(context.getTableType())
-        .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
-    dataSource.setTableName(tableNameWithType);
-    context.getPinotQuery().setDataSource(dataSource);
-    context.getPinotQuery().setSelectList(
-        node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
-    return null;
-  }
-
-  @Override
-  public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
-  }
-
-  private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
-    for (PlanNode child : node.getInputs()) {
-      child.visit(this, context);
-    }
-  }
-
   /**
    * Helper method to attach the time boundary to the given PinotQuery.
    */
@@ -289,4 +169,106 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
       pinotQuery.setFilterExpression(timeFilterExpression);
     }
   }
+
+  /**
+   * attach the dynamic filter to the given PinotQuery.
+   */
+  static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List<Object[]> dataContainer,
+      DataSchema dataSchema) {
+    FieldSelectionKeySelector leftSelector = (FieldSelectionKeySelector) joinKeys.getLeftJoinKeySelector();
+    FieldSelectionKeySelector rightSelector = (FieldSelectionKeySelector) joinKeys.getRightJoinKeySelector();
+    List<Expression> expressions = new ArrayList<>();
+    for (int i = 0; i < leftSelector.getColumnIndices().size(); i++) {
+      Expression leftExpr = pinotQuery.getSelectList().get(leftSelector.getColumnIndices().get(i));
+      int rightIdx = rightSelector.getColumnIndices().get(i);
+      Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name());
+      List<Expression> operands = new ArrayList<>(dataContainer.size() + 1);
+      operands.add(leftExpr);
+      operands.addAll(computeInOperands(dataContainer, dataSchema, rightIdx));
+      inFilterExpr.getFunctionCall().setOperands(operands);
+      expressions.add(inFilterExpr);
+    }
+    attachFilterExpression(pinotQuery, FilterKind.AND, expressions);
+  }
+
+  private static List<Expression> computeInOperands(List<Object[]> dataContainer, DataSchema dataSchema, int colIdx) {
+    final DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colIdx);
+    final FieldSpec.DataType storedType = columnDataType.getStoredType().toDataType();;
+    final int numRows = dataContainer.size();
+    List<Expression> expressions = new ArrayList<>();
+    switch (storedType) {
+      case INT:
+        int[] arrInt = new int[numRows];
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          arrInt[rowIdx] = (int) dataContainer.get(rowIdx)[colIdx];
+        }
+        Arrays.sort(arrInt);
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          expressions.add(RequestUtils.getLiteralExpression(arrInt[rowIdx]));
+        }
+        break;
+      case LONG:
+        long[] arrLong = new long[numRows];
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          arrLong[rowIdx] = (long) dataContainer.get(rowIdx)[colIdx];
+        }
+        Arrays.sort(arrLong);
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          expressions.add(RequestUtils.getLiteralExpression(arrLong[rowIdx]));
+        }
+        break;
+      case FLOAT:
+        float[] arrFloat = new float[numRows];
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          arrFloat[rowIdx] = (float) dataContainer.get(rowIdx)[colIdx];
+        }
+        Arrays.sort(arrFloat);
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          expressions.add(RequestUtils.getLiteralExpression(arrFloat[rowIdx]));
+        }
+        break;
+      case DOUBLE:
+        double[] arrDouble = new double[numRows];
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          arrDouble[rowIdx] = (double) dataContainer.get(rowIdx)[colIdx];
+        }
+        Arrays.sort(arrDouble);
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          expressions.add(RequestUtils.getLiteralExpression(arrDouble[rowIdx]));
+        }
+        break;
+      case STRING:
+        String[] arrString = new String[numRows];
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          arrString[rowIdx] = (String) dataContainer.get(rowIdx)[colIdx];
+        }
+        Arrays.sort(arrString);
+        for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+          expressions.add(RequestUtils.getLiteralExpression(arrString[rowIdx]));
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal SV data type for ID_SET aggregation function: " + storedType);
+    }
+    return expressions;
+  }
+
+  /**
+   * Attach Filter Expression to existing PinotQuery.
+   */
+  private static void attachFilterExpression(PinotQuery pinotQuery, FilterKind attachKind, List<Expression> exprs) {
+    Preconditions.checkState(attachKind == FilterKind.AND || attachKind == FilterKind.OR);
+    Expression filterExpression = pinotQuery.getFilterExpression();
+    List<Expression> arrayList = new ArrayList<>(exprs);
+    if (filterExpression != null) {
+      arrayList.add(filterExpression);
+    }
+    if (arrayList.size() > 1) {
+      Expression attachFilterExpression = RequestUtils.getFunctionExpression(attachKind.name());
+      attachFilterExpression.getFunctionCall().setOperands(arrayList);
+      pinotQuery.setFilterExpression(attachFilterExpression);
+    } else {
+      pinotQuery.setFilterExpression(arrayList.get(0));
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
new file mode 100644
index 0000000000..9b1f97426c
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+  private static final ServerPlanRequestVisitor INSTANCE = new ServerPlanRequestVisitor();
+
+  static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
+    node.visit(INSTANCE, context);
+  }
+
+  @Override
+  public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    // set group-by list
+    context.getPinotQuery()
+        .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery()));
+    // set agg list
+    context.getPinotQuery().setSelectList(
+        CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(),
+            context.getPinotQuery()));
+    return null;
+  }
+
+  @Override
+  public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
+    throw new UnsupportedOperationException("Window not yet supported!");
+  }
+
+  @Override
+  public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) {
+    throw new UnsupportedOperationException("Exchange not yet supported!");
+  }
+
+  @Override
+  public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    context.getPinotQuery()
+        .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery()));
+    return null;
+  }
+
+  @Override
+  public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
+    // visit only the static side, turn the dynamic side into a lookup from the pipeline breaker resultDataContainer
+    PlanNode staticSide = node.getInputs().get(0);
+    PlanNode dynamicSide = node.getInputs().get(1);
+    if (staticSide instanceof MailboxReceiveNode) {
+      dynamicSide = node.getInputs().get(0);
+      staticSide = node.getInputs().get(1);
+    }
+    staticSide.visit(this, context);
+    int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
+    List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().get(resultMapId);
+    List<Object[]> resultDataContainer = new ArrayList<>();
+    DataSchema dataSchema = dynamicSide.getDataSchema();
+    for (TransferableBlock block : transferableBlocks) {
+      if (block.getType() == DataBlock.Type.ROW) {
+        resultDataContainer.addAll(block.getContainer());
+      }
+    }
+
+    if (resultDataContainer.size() > 0) {
+      // rewrite SEMI-JOIN as filter clause.
+      ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), node.getJoinKeys(), resultDataContainer,
+          dataSchema);
+    } else {
+      // do not pull any data out, this is constant false filter.
+      context.getPinotQuery().setLimit(0);
+    }
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitProject(ProjectNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    context.getPinotQuery()
+        .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery()));
+    return null;
+  }
+
+  @Override
+  public Void visitSort(SortNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    PinotQuery pinotQuery = context.getPinotQuery();
+    if (node.getCollationKeys().size() > 0) {
+      pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
+    }
+    if (node.getFetch() > 0) {
+      pinotQuery.setLimit(node.getFetch());
+    }
+    if (node.getOffset() > 0) {
+      pinotQuery.setOffset(node.getOffset());
+    }
+    return null;
+  }
+
+  @Override
+  public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) {
+    DataSource dataSource = new DataSource();
+    String tableNameWithType = TableNameBuilder.forType(context.getTableType())
+        .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
+    dataSource.setTableName(tableNameWithType);
+    context.getPinotQuery().setDataSource(dataSource);
+    context.getPinotQuery().setSelectList(
+        node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+    return null;
+  }
+
+  @Override
+  public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return null;
+  }
+
+  private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
+    for (PlanNode child : node.getInputs()) {
+      child.visit(this, context);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 75401ca07f..5042805948 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,12 +20,16 @@ package org.apache.pinot.query;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.service.QueryConfig;
@@ -65,6 +69,9 @@ public class QueryServerEnclosure {
   private final HelixManager _helixManager;
 
   private final QueryRunner _queryRunner;
+  private final ExecutorService _executor = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+      new NamedThreadFactory("QueryServerTest_Server"));
+
 
   public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
     try {
@@ -124,6 +131,12 @@ public class QueryServerEnclosure {
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
-    _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+    _executor.submit(() -> {
+      try {
+        _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+      } catch (Exception e) {
+        throw new RuntimeException("Error executing query!", e);
+      }
+    });
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index ca35b62a9c..88081cb342 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -215,6 +215,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
   @DataProvider(name = "testDataWithSqlToFinalRowCount")
   private Object[][] provideTestSqlAndRowCount() {
     return new Object[][]{
+        new Object[]{"SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a "
+            + " WHERE a.col1 IN (SELECT b.col2 FROM b WHERE b.col3 < 10) AND a.col3 > 0", 9},
+
         // using join clause
         new Object[]{"SELECT * FROM a JOIN b USING (col1)", 15},
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 8276c647aa..83dea7c82d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -25,7 +25,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -41,7 +41,7 @@ public class LiteralValueOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private PlanRequestContext _context;
+  private PhysicalPlanContext _context;
 
   @Mock
   private VirtualServerAddress _serverAddress;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index ca24f97043..b099e5f807 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -67,9 +67,6 @@ public class QueryServerTest extends QueryTestSet {
   private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE =
       Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
-  private static final ExecutorService RUNNER_EXECUTOR_SERVICE =
-      Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
-          new NamedThreadFactory("QueryServerTest_Runner"));
 
   private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
   private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -85,7 +82,6 @@ public class QueryServerTest extends QueryTestSet {
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
       Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
       Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
-      Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 69d2dda026..98a54b5f6e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -55,8 +55,6 @@ public class QueryDispatcherTest extends QueryTestSet {
       ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_LeafWorker"));
   private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool(
       ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
-  private static final ExecutorService RUNNER_EXECUTOR_SERVICE = Executors.newFixedThreadPool(
-      ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("QueryDispatcherTest_Runner"));
 
   private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
   private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -73,7 +71,6 @@ public class QueryDispatcherTest extends QueryTestSet {
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
       Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
       Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
-      Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer = Mockito.spy(queryServer);
       queryServer.start();
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 464dac98f5..2bc442fc09 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -62,6 +62,26 @@
       {
         "description": "Colocated JOIN with partition column and group by non-partitioned column",
         "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy'))"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Dynamic broadcast SEMI-JOIN with empty right table result",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers",
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org