You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/22 22:45:25 UTC

[pinot] branch master updated: [multistage][bugfix] fix pipeline breaker error block populate (#10957)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c5d8fe85 [multistage][bugfix] fix pipeline breaker error block populate (#10957)
24c5d8fe85 is described below

commit 24c5d8fe8521b91e68dd07294a3f57b067c0d525
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Jun 22 15:45:17 2023 -0700

    [multistage][bugfix] fix pipeline breaker error block populate (#10957)
    
    * [clean up] make pipeline breaker executor always return without exception
    * [clean up] refactor server plan request util entry point
    * [test] add QueryRunner test as well as PB unit-tests
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  87 ++----
 .../plan/pipeline/PipelineBreakerContext.java      |  16 +-
 .../plan/pipeline/PipelineBreakerExecutor.java     |  36 ++-
 .../plan/pipeline/PipelineBreakerOperator.java     |   2 +-
 .../plan/pipeline/PipelineBreakerResult.java       |   3 +
 .../plan/pipeline/PipelineBreakerVisitor.java      |   2 +-
 .../plan/server/ServerPlanRequestUtils.java        |  51 +++-
 .../pinot/query/runtime/QueryRunnerTest.java       |   6 +
 .../plan/pipeline/PipelineBreakerExecutorTest.java | 311 +++++++++++++++++++++
 9 files changed, 427 insertions(+), 87 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index aa16f0b220..196df534d8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -31,7 +31,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -42,7 +41,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -53,19 +51,14 @@ import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
 import org.apache.pinot.query.service.QueryConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,8 +132,7 @@ public class QueryRunner {
    * <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
    * for results/exceptions.</p>
    */
-  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap)
-      throws Exception {
+  public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
@@ -148,22 +140,14 @@ public class QueryRunner {
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
 
     // run pre-stage execution for all pipeline breakers
-    PipelineBreakerResult pipelineBreakerResult;
-    try {
-      pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
-          distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
-    } catch (Exception e) {
-      LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
-          distributedStagePlan.getStageId(), e);
-      _scheduler.cancel(requestId);
-      throw e;
-    }
+    PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
+        _mailboxService, distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
 
     // run OpChain
     if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
       try {
-        OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
-            timeoutMs, deadlineMs, requestId);
+        OpChain rootOperator = compileLeafStage(requestId, distributedStagePlan, requestMetadataMap,
+            pipelineBreakerResult, deadlineMs, isTraceEnabled);
         _scheduler.register(rootOperator);
       } catch (Exception e) {
         LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
@@ -172,10 +156,8 @@ public class QueryRunner {
       }
     } else {
       try {
-        PlanNode stageRoot = distributedStagePlan.getStageRoot();
-        OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
-            new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
-                distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
+        OpChain rootOperator = compileIntermediateStage(requestId, distributedStagePlan, requestMetadataMap,
+            pipelineBreakerResult, deadlineMs, isTraceEnabled);
         _scheduler.register(rootOperator);
       } catch (Exception e) {
         LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
@@ -199,15 +181,24 @@ public class QueryRunner {
     return _queryWorkerIntermExecutorService;
   }
 
-  private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
-      PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
-    boolean isTraceEnabled =
-        Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
+  private OpChain compileIntermediateStage(long requestId, DistributedStagePlan distributedStagePlan,
+      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
+      boolean isTraceEnabled) {
+    PlanNode stageRoot = distributedStagePlan.getStageRoot();
+    return PhysicalPlanVisitor.walkPlanNode(stageRoot,
+        new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
+            distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), pipelineBreakerResult,
+            isTraceEnabled));
+  }
+
+  private OpChain compileLeafStage(long requestId, DistributedStagePlan distributedStagePlan,
+      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
+      boolean isTraceEnabled) {
     PhysicalPlanContext planContext = new PhysicalPlanContext(_mailboxService, requestId,
         distributedStagePlan.getStageId(), deadlineMs, distributedStagePlan.getServer(),
         distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
-    List<ServerPlanRequestContext> serverPlanRequestContexts =
-        constructServerQueryRequests(planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
+    List<ServerPlanRequestContext> serverPlanRequestContexts = ServerPlanRequestUtils.constructServerQueryRequests(
+        planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
     List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
     for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
       serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
@@ -225,40 +216,6 @@ public class QueryRunner {
     return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
   }
 
-  private static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
-      DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
-      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
-    StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
-    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
-    String rawTableName = StageMetadata.getTableName(stageMetadata);
-    Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
-    List<ServerPlanRequestContext> requests = new ArrayList<>();
-    for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
-      String tableType = tableEntry.getKey();
-      // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
-      // network traffic. but there's chance to improve this:
-      // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
-      if (TableType.OFFLINE.name().equals(tableType)) {
-        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
-            TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-            TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
-            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
-      } else if (TableType.REALTIME.name().equals(tableType)) {
-        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
-            TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-            TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
-            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
-      } else {
-        throw new IllegalArgumentException("Unsupported table type key: " + tableType);
-      }
-    }
-    return requests;
-  }
-
   private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
     InstanceResponseBlock result;
     try {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
index 110c8e22e5..b8b50746aa 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
@@ -25,19 +25,15 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 
 
 /**
- * This class used to record the pipeline breaker operator that needs to be run before the main opChain.
+ * This class used by {@link PipelineBreakerVisitor} as context to detect the {@link PlanNode} that needs to be run
+ * before the main opChain starts.
  */
-public class PipelineBreakerContext {
-  private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
+class PipelineBreakerContext {
   private final Map<PlanNode, Integer> _planNodeObjectToIdMap = new HashMap<>();
+  private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
 
-  private final boolean _isLeafStage;
   private int _currentNodeId = 0;
 
-  public PipelineBreakerContext(boolean isLeafStage) {
-    _isLeafStage = isLeafStage;
-  }
-
   public void addPipelineBreaker(MailboxReceiveNode mailboxReceiveNode) {
     int nodeId = _planNodeObjectToIdMap.get(mailboxReceiveNode);
     _pipelineBreakerMap.put(nodeId, mailboxReceiveNode);
@@ -55,8 +51,4 @@ public class PipelineBreakerContext {
   public Map<Integer, PlanNode> getPipelineBreakerMap() {
     return _pipelineBreakerMap;
   }
-
-  public boolean isLeafStage() {
-    return _isLeafStage;
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 0202127314..fa33192a64 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.plan.pipeline;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -48,18 +50,26 @@ public class PipelineBreakerExecutor {
   }
 
   /**
-   * Execute a pipeline breaker and collect the results (synchronously)
+   * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only
+   *    execute mailbox receive pipeline breaker.
    *
-   * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
+   * @param scheduler scheduler service to run the pipeline breaker main thread.
+   * @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against.
+   * @param distributedStagePlan the distributed stage plan to run pipeline breaker on.
+   * @param deadlineMs execution deadline
+   * @param requestId request ID
+   * @param isTraceEnabled whether to enable trace.
+   * @return pipeline breaker result;
+   *   - If exception occurs, exception block will be wrapped in {@link TransferableBlock} and assigned to each PB node.
+   *   - Normal stats will be attached to each PB node and downstream execution should return with stats attached.
    */
   public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
       MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs,
-      long requestId, boolean isTraceEnabled)
-      throws Exception {
-    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
-        DistributedStagePlan.isLeafStage(distributedStagePlan));
+      long requestId, boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
     if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+      try {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
       // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage OpChain
       //     receive-mail callbacks.
@@ -70,6 +80,19 @@ public class PipelineBreakerExecutor {
       Map<Integer, List<TransferableBlock>> resultMap =
           PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
       return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+      } catch (Exception e) {
+        LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: "
+            + distributedStagePlan.getStageId(), e);
+        // Create all error blocks for all pipeline breaker nodes.
+        TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+        Map<Integer, List<TransferableBlock>> resultMap = new HashMap<>();
+        for (int key : pipelineBreakerContext.getNodeIdMap().values()) {
+          if (pipelineBreakerContext.getPipelineBreakerMap().containsKey(key)) {
+            resultMap.put(key, Collections.singletonList(errorBlock));
+          }
+        }
+        return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+      }
     } else {
       return null;
     }
@@ -82,7 +105,6 @@ public class PipelineBreakerExecutor {
     for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) {
       int key = e.getKey();
       PlanNode planNode = e.getValue();
-      // TODO: supprot other pipeline breaker node type as well.
       if (!(planNode instanceof MailboxReceiveNode)) {
         throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now");
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 3c338ac1a8..438ce15a37 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -35,7 +35,7 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 
 
-public class PipelineBreakerOperator extends MultiStageOperator {
+class PipelineBreakerOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
   private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
   private final Map<Integer, List<TransferableBlock>> _resultMap;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index 7ebd1d0c8d..8388337ff1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -24,6 +24,9 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
+/**
+ * execution result encapsulation for {@link PipelineBreakerExecutor}.
+ */
 public class PipelineBreakerResult {
   private final Map<PlanNode, Integer> _nodeIdMap;
   private final Map<Integer, List<TransferableBlock>> _resultMap;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
index 49add00a11..efa97d6638 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
 
 
-public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
+class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
   private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor();
 
   public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 13659ae810..616413f3ad 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -25,6 +25,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
@@ -38,14 +41,17 @@ import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
@@ -69,7 +75,50 @@ public class ServerPlanRequestUtils {
     // do not instantiate.
   }
 
-  public static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
+  /**
+   * Entry point to construct a {@link ServerPlanRequestContext} for executing leaf-stage runner.
+   *
+   * @param planContext physical plan context of the stage.
+   * @param distributedStagePlan distributed stage plan of the stage.
+   * @param requestMetadataMap metadata map
+   * @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
+   * @return a list of server plan request context to be run
+   */
+  public static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
+      DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+    StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
+    WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
+    String rawTableName = StageMetadata.getTableName(stageMetadata);
+    Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
+    List<ServerPlanRequestContext> requests = new ArrayList<>();
+    for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
+      String tableType = tableEntry.getKey();
+      // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
+      // network traffic. but there's chance to improve this:
+      // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
+      if (TableType.OFFLINE.name().equals(tableType)) {
+        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
+            TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+            TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
+      } else if (TableType.REALTIME.name().equals(tableType)) {
+        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
+            TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+            TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
+      } else {
+        throw new IllegalArgumentException("Unsupported table type key: " + tableType);
+      }
+    }
+    return requests;
+  }
+
+  private static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
       Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
       TableType tableType, List<String> segmentList) {
     // Before-visit: construct the ServerPlanRequestContext baseline
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 88081cb342..20bbf0f1ee 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -257,6 +257,12 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
   @DataProvider(name = "testDataWithSqlExecutionExceptions")
   private Object[][] provideTestSqlWithExecutionException() {
     return new Object[][]{
+        // query hint with dynamic broadcast pipeline breaker should return error upstream
+        new Object[]{
+            "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a "
+                + " WHERE a.col1 IN (SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 > 0",
+            "without text index"
+        },
         // Timeout exception should occur with this option:
         new Object[]{
             "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col1 = c.col1",
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
new file mode 100644
index 0000000000..56d78effe4
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxIdUtils;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+import static org.mockito.Mockito.when;
+
+
+public class PipelineBreakerExecutorTest {
+  private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
+  private static final DataSchema DATA_SCHEMA =
+      new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+  private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
+  private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 2, 0, 0, 0);
+
+  private AutoCloseable _mocks;
+  @Mock
+  private MailboxService _mailboxService;
+  @Mock
+  private ReceivingMailbox _mailbox1;
+  @Mock
+  private ReceivingMailbox _mailbox2;
+
+  private VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
+  private OpChainSchedulerService _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(10_000L),
+      Executors.newCachedThreadPool());
+  private StageMetadata _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_server).map(
+      s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+          .addMailBoxInfoMap(0, new MailboxMetadata(
+              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                  org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+              ImmutableList.of(_server), ImmutableMap.of()))
+          .addMailBoxInfoMap(1, new MailboxMetadata(
+              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+              ImmutableList.of(_server), ImmutableMap.of()))
+          .addMailBoxInfoMap(2, new MailboxMetadata(
+              ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+              ImmutableList.of(_server), ImmutableMap.of()))
+          .build())
+      .collect(Collectors.toList())).build();
+
+  @BeforeClass
+  public void setUpClass() {
+    _scheduler.startAsync();
+  }
+
+  @AfterClass
+  public void tearDownClass() {
+    _scheduler.stopAsync();
+  }
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    when(_mailboxService.getHostname()).thenReturn("localhost");
+    when(_mailboxService.getPort()).thenReturn(123);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldReturnBlocksUponNormalOperation() {
+    MailboxReceiveNode mailboxReceiveNode =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+
+    // when
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 3};
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+        OperatorTestUtil.block(DATA_SCHEMA, row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() + 10_000L, 0, false);
+
+    // then
+    // should have single PB result, receive 2 data blocks, EOS block shouldn't be included
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2);
+  }
+
+  @Test
+  public void shouldWorkWithMultiplePBNodeUponNormalOperation() {
+    MailboxReceiveNode mailboxReceiveNode1 =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    MailboxReceiveNode mailboxReceiveNode2 =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+    joinNode.addInput(mailboxReceiveNode1);
+    joinNode.addInput(mailboxReceiveNode2);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+    // when
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 3};
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() + 10_000L, 0, false);
+
+    // then
+    // should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+    Iterator<List<TransferableBlock>> it = pipelineBreakerResult.getResultMap().values().iterator();
+    Assert.assertEquals(it.next().size(), 1);
+    Assert.assertEquals(it.next().size(), 1);
+    Assert.assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void shouldReturnErrorBlocksFailureWhenPBExecute() {
+    MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+
+    // when
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() + 10_000L, 0, false);
+
+    // then
+    // should contain only failure error blocks
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+    List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
+    Assert.assertEquals(resultBlocks.size(), 1);
+    Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+    Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+  }
+
+  @Test
+  public void shouldReturnErrorBlocksFailureWhenPBTimeout() {
+    MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+
+    // when
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 3};
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+        OperatorTestUtil.block(DATA_SCHEMA, row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() - 10_000L, 0, false);
+
+    // then
+    // should contain only failure error blocks
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+    List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
+    Assert.assertEquals(resultBlocks.size(), 1);
+    Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+    Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+  }
+
+  @Test
+  public void shouldReturnErrorBlocksWhenAnyPBFailure() {
+    MailboxReceiveNode mailboxReceiveNode1 =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+    joinNode.addInput(mailboxReceiveNode1);
+    joinNode.addInput(incorrectlyConfiguredMailboxNode);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+    // when
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 3};
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() + 10_000L, 0, false);
+
+    // then
+    // should fail even if one of the 2 PB returns correct results.
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+    for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
+      Assert.assertEquals(resultBlocks.size(), 1);
+      Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+      Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+    }
+  }
+
+  @Test
+  public void shouldReturnErrorBlocksWhenReceivedErrorFromSender() {
+    MailboxReceiveNode mailboxReceiveNode1 =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+        new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+            null, null, false, false, null);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+    joinNode.addInput(mailboxReceiveNode1);
+    joinNode.addInput(incorrectlyConfiguredMailboxNode);
+    DistributedStagePlan distributedStagePlan =
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+    // when
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+    when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 3};
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+        TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("ERROR ON 1")));
+    when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+            System.currentTimeMillis() + 10_000L, 0, false);
+
+    // then
+    // should fail even if one of the 2 PB doesn't contain error block from sender.
+    Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+    for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
+      Assert.assertEquals(resultBlocks.size(), 1);
+      Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+      Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org