You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/09 23:38:00 UTC

[pinot] branch master updated: [multistage][bugfix] fix PB operator never returns block (#10881)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e8655449 [multistage][bugfix] fix PB operator never returns block (#10881)
b1e8655449 is described below

commit b1e8655449abd903330993370dee2541869f8be9
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Jun 9 16:37:53 2023 -0700

    [multistage][bugfix] fix PB operator never returns block (#10881)
    
    * fix PB operator never returns block thus hogging the opChainScheduler pool issue.
    * also fix empty block NPE, this never happened since previously we also add no-op block to the result map.
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java | 7 ++++---
 .../pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java  | 4 +++-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index ffb01aa137..3c338ac1a8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -79,14 +79,14 @@ public class PipelineBreakerOperator extends MultiStageOperator {
       Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.remove();
       TransferableBlock block = worker.getValue().nextBlock();
 
-      // Release the mailbox when the block is end-of-stream
-      if (block != null && block.isSuccessfulEndOfStreamBlock()) {
+      // Release the mailbox worker when the block is end-of-stream
+      if (block != null && !block.isNoOpBlock() && block.isSuccessfulEndOfStreamBlock()) {
         continue;
       }
 
       // Add the worker back to the queue if the block is not end-of-stream
       _workerEntries.add(worker);
-      if (block != null) {
+      if (block != null && !block.isNoOpBlock()) {
         if (block.isErrorBlock()) {
           _errorBlock = block;
           constructErrorResponse(block);
@@ -97,6 +97,7 @@ public class PipelineBreakerOperator extends MultiStageOperator {
         if (!block.isEndOfStreamBlock()) {
           blockList.add(block);
         }
+        return block;
       }
     }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 9b1f97426c..7cda76bfa2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.plan.server;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -109,7 +110,8 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
     }
     staticSide.visit(this, context);
     int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
-    List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().get(resultMapId);
+    List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().getOrDefault(
+        resultMapId, Collections.emptyList());
     List<Object[]> resultDataContainer = new ArrayList<>();
     DataSchema dataSchema = dynamicSide.getDataSchema();
     for (TransferableBlock block : transferableBlocks) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org