You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/09 04:23:37 UTC

[pinot] branch master updated: [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ab0af00a1 [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)
7ab0af00a1 is described below

commit 7ab0af00a1179b011a8ab9211f28eeb318ea336c
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon May 8 21:23:27 2023 -0700

    [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)
    
    * Use cached threadpool to avoid sending starvation
    * Separate leaf stage to a different scheduler
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/runtime/QueryRunner.java    | 117 +++++++--------------
 .../runtime/executor/LeafSchedulerService.java     | 102 ++++++++++++++++++
 .../LeafStageTransferableBlockOperator.java        |  58 ++++++----
 .../apache/pinot/query/service/QueryServer.java    |   6 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   3 +-
 ...Test.java => YieldingSchedulerServiceTest.java} |   2 +-
 .../LeafStageTransferableBlockOperatorTest.java    |  82 ++++++++++++---
 .../pinot/query/runtime/operator/OpChainTest.java  |   7 +-
 8 files changed, 254 insertions(+), 123 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index ced674d269..8876a950e3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -35,7 +36,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
-import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -45,7 +45,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.executor.LeafSchedulerService;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -85,34 +85,13 @@ public class QueryRunner {
   private String _hostname;
   private int _port;
   private VirtualServerAddress _rootServer;
-  // Query worker threads are used for (1) running intermediate stage operators (2) running segment level operators
-  /**
-   * Query worker threads are used for:
-   * <ol>
-   *   <li>
-   *     Running intermediate stage operators (v2 engine operators).
-   *   </li>
-   *   <li>
-   *     Running per-segment operators submitted in {@link BaseCombineOperator}.
-   *   </li>
-   * </ol>
-   */
+
   private ExecutorService _queryWorkerIntermExecutorService;
   private ExecutorService _queryWorkerLeafExecutorService;
-  /**
-   * Query runner threads are used for:
-   * <ol>
-   *   <li>
-   *     Merging results in BaseCombineOperator for leaf stages. Results are provided by per-segment operators run in
-   *     worker threads
-   *   </li>
-   *   <li>
-   *     Building the OperatorChain and submitting to the scheduler for non-leaf stages (intermediate stages).
-   *   </li>
-   * </ol>
-   */
   private ExecutorService _queryRunnerExecutorService;
-  private OpChainSchedulerService _scheduler;
+
+  private OpChainSchedulerService _intermScheduler;
+  private LeafSchedulerService _leafScheduler;
 
   /**
    * Initializes the query executor.
@@ -130,15 +109,16 @@ public class QueryRunner {
     try {
       long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
           QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
-      _queryWorkerIntermExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+      _queryWorkerIntermExecutorService = Executors.newCachedThreadPool(
           new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port"));
       _queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
-      _queryRunnerExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+      _queryRunnerExecutorService = Executors.newCachedThreadPool(
           new NamedThreadFactory("query_runner_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
+      _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
           getQueryWorkerIntermExecutorService());
-      _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable);
+      _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService());
+      _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
     } catch (Exception e) {
@@ -151,14 +131,14 @@ public class QueryRunner {
     _helixPropertyStore = _helixManager.getHelixPropertyStore();
     _mailboxService.start();
     _serverExecutor.start();
-    _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
+    _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
   }
 
   public void shutDown()
       throws TimeoutException {
     _serverExecutor.shutDown();
     _mailboxService.shutdown();
-    _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
+    _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
@@ -168,18 +148,20 @@ public class QueryRunner {
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
     if (isLeafStage(distributedStagePlan)) {
-      runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, requestId);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
+          requestId);
+      _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
       OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
               distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
-      _scheduler.register(rootOperator);
+      _intermScheduler.register(rootOperator);
     }
   }
 
   public void cancel(long requestId) {
-    _scheduler.cancel(requestId);
+    _intermScheduler.cancel(requestId);
   }
 
   @VisibleForTesting
@@ -196,7 +178,7 @@ public class QueryRunner {
     return _queryRunnerExecutorService;
   }
 
-  private void runLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+  private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
       long timeoutMs, long deadlineMs, long requestId) {
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
@@ -208,9 +190,19 @@ public class QueryRunner {
       serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
           new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
     }
-    getQueryRunnerExecutorService().submit(() -> {
-      processServerQuery(requestId, timeoutMs, deadlineMs, isTraceEnabled, distributedStagePlan, serverQueryRequests);
-    });
+    MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
+    OpChainExecutionContext opChainExecutionContext =
+        new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
+            distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
+            isTraceEnabled);
+    MultiStageOperator leafStageOperator =
+        new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
+            serverQueryRequests, sendNode.getDataSchema());
+    MailboxSendOperator mailboxSendOperator =
+        new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
+            sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
+            sendNode.isSortOnSender(), sendNode.getReceiverStageId());
+    return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
   }
 
   private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
@@ -249,46 +241,17 @@ public class QueryRunner {
     return requests;
   }
 
-  private void processServerQuery(long requestId, long timeoutMs, long deadlineMs, boolean isTraceEnabled,
-      DistributedStagePlan distributedStagePlan, List<ServerQueryRequest> serverQueryRequests) {
-    MailboxSendOperator mailboxSendOperator = null;
+  private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
+    InstanceResponseBlock result;
     try {
-      // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
-      List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
-      for (ServerQueryRequest request : serverQueryRequests) {
-        InstanceResponseBlock result;
-        try {
-          result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService());
-        } catch (Exception e) {
-          InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-          errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
-              e.getMessage() + QueryException.getTruncatedStackTrace(e));
-          result = errorResponse;
-        }
-        serverQueryResults.add(result);
-      }
-      MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
-      OpChainExecutionContext opChainExecutionContext =
-          new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
-              distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
-              isTraceEnabled);
-      MultiStageOperator leafStageOperator =
-          new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
-      mailboxSendOperator =
-          new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
-              sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
-              sendNode.isSortOnSender(), sendNode.getReceiverStageId());
-      int blockCounter = 0;
-      while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
-        LOGGER.debug("Acquired transferable block: {}", blockCounter++);
-      }
-      mailboxSendOperator.close();
+      result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService());
     } catch (Exception e) {
-      LOGGER.error(String.format("Error running leafStage for requestId=%s", requestId), e);
-      if (mailboxSendOperator != null) {
-        mailboxSendOperator.cancel(e);
-      }
+      InstanceResponseBlock errorResponse = new InstanceResponseBlock();
+      errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
+          e.getMessage() + QueryException.getTruncatedStackTrace(e));
+      result = errorResponse;
     }
+    return result;
   }
 
   private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
new file mode 100644
index 0000000000..faa4bf4945
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeafSchedulerService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class);
+
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
+
+  public LeafSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
+  }
+
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+      @Override
+      public void runJob() {
+        boolean isFinished = false;
+        boolean returnedErrorBlock = false;
+        Throwable thrown = null;
+        try {
+          LOGGER.trace("({}): Executing", operatorChain);
+          operatorChain.getStats().executing();
+          TransferableBlock result = operatorChain.getRoot().nextBlock();
+          while (!result.isEndOfStreamBlock()) {
+            result = operatorChain.getRoot().nextBlock();
+          }
+          isFinished = true;
+          if (result.isErrorBlock()) {
+            returnedErrorBlock = true;
+            LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
+                result.getDataBlock().getExceptions());
+          } else {
+            LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
+          }
+        } catch (Exception e) {
+          LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
+          thrown = e;
+        } finally {
+          if (returnedErrorBlock || thrown != null) {
+            cancelOpChain(operatorChain, thrown);
+          } else if (isFinished) {
+            closeOpChain(operatorChain);
+          }
+        }
+      }
+    });
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
+  }
+
+  public void cancel(long requestId) {
+    // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up
+    // via query timeout.
+    List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()
+        .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList());
+    for (OpChainId opChainId : opChainIdsToCancel) {
+      Future<?> future = _submittedOpChainMap.get(opChainId);
+      if (future != null) {
+        future.cancel(true);
+      }
+    }
+  }
+
+  private void closeOpChain(OpChain opChain) {
+    opChain.close();
+  }
+
+  private void cancelOpChain(OpChain opChain, Throwable t) {
+    opChain.cancel(t);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 5656baebe8..64e9117a92 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -38,6 +40,7 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -62,22 +65,19 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
   private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class);
 
-  private final InstanceResponseBlock _errorBlock;
-  private final List<InstanceResponseBlock> _baseResultBlock;
+  private final LinkedList<ServerQueryRequest> _serverQueryRequestQueue;
   private final DataSchema _desiredDataSchema;
-  private int _currentIndex;
+  private final Function<ServerQueryRequest, InstanceResponseBlock> _processCall;
+
+  private InstanceResponseBlock _errorBlock;
 
   public LeafStageTransferableBlockOperator(OpChainExecutionContext context,
-      List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) {
+      Function<ServerQueryRequest, InstanceResponseBlock> processCall,
+      List<ServerQueryRequest> serverQueryRequestList, DataSchema dataSchema) {
     super(context);
-    _baseResultBlock = baseResultBlock;
+    _processCall = processCall;
+    _serverQueryRequestQueue = new LinkedList<>(serverQueryRequestList);
     _desiredDataSchema = dataSchema;
-    _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
-    _currentIndex = 0;
-    for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) {
-      OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId());
-      operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
-    }
   }
 
   @Override
@@ -93,27 +93,39 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (_currentIndex < 0) {
+    if (_errorBlock != null) {
       throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
     }
-    if (_errorBlock != null) {
-      _currentIndex = -1;
+    // runLeafStage
+    InstanceResponseBlock responseBlock = getNextBlockFromLeafStage();
+    if (responseBlock == null) {
+      // finished getting next block from leaf stage. returning EOS
+      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+    } else if (!responseBlock.getExceptions().isEmpty()) {
+      // get error from leaf stage, return ERROR
+      _errorBlock = responseBlock;
       return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
     } else {
-      if (_currentIndex < _baseResultBlock.size()) {
-        InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
-        if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
-          return composeTransferableBlock(responseBlock, _desiredDataSchema);
-        } else {
-          return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
-        }
+      // return normal block.
+      OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId());
+      operatorStats.recordExecutionStats(responseBlock.getResponseMetadata());
+      if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
+        return composeTransferableBlock(responseBlock, _desiredDataSchema);
       } else {
-        _currentIndex = -1;
-        return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+        return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
       }
     }
   }
 
+  private @Nullable InstanceResponseBlock getNextBlockFromLeafStage() {
+    if (!_serverQueryRequestQueue.isEmpty()) {
+      ServerQueryRequest request = _serverQueryRequestQueue.pop();
+      return _processCall.apply(request);
+    } else {
+      return null;
+    }
+  }
+
   /**
    * Leaf stage operators should always collect stats for the tables used in queries
    * Otherwise the Broker response will just contain zeros for every stat value
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index 8e7061bff8..4cd92ab32e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -22,9 +22,7 @@ import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
-import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
@@ -64,7 +62,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       }
       _queryRunner.start();
       _server.start();
-    } catch (IOException | TimeoutException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -77,7 +75,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
         _server.shutdown();
         _server.awaitTermination();
       }
-    } catch (InterruptedException | TimeoutException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 36df3de6ff..75401ca07f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeoutException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -119,7 +118,7 @@ public class QueryServerEnclosure {
   public void shutDown() {
     try {
       _queryRunner.shutDown();
-    } catch (TimeoutException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
index fbbab9e4d9..4f7f8e5565 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.clearInvocations;
 
 
-public class OpChainSchedulerServiceTest {
+public class YieldingSchedulerServiceTest {
 
   private ExecutorService _executor;
   private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 9d04d07df9..dc2d2dcb75 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Record;
@@ -32,6 +35,7 @@ import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -75,7 +79,8 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -101,7 +106,9 @@ public class LeafStageTransferableBlockOperatorTest {
         new SelectionResultsBlock(resultSchema,
             Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, desiredSchema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()),
+            desiredSchema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -123,7 +130,8 @@ public class LeafStageTransferableBlockOperatorTest {
         new SelectionResultsBlock(schema,
             Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -148,7 +156,8 @@ public class LeafStageTransferableBlockOperatorTest {
             queryContext),
         new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock1 = operator.nextBlock();
@@ -178,12 +187,17 @@ public class LeafStageTransferableBlockOperatorTest {
         errorBlock,
         new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
-
     // Then:
+    Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{"foo", 1});
+    Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{"", 2});
+
+    // When:
+    resultBlock = operator.nextBlock();
     Assert.assertTrue(resultBlock.isErrorBlock());
   }
 
@@ -199,7 +213,8 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
             Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -220,7 +235,8 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
             Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -244,7 +260,8 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -267,7 +284,8 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -286,7 +304,8 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -308,7 +327,9 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+            desiredSchema);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -331,7 +352,9 @@ public class LeafStageTransferableBlockOperatorTest {
         new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
             new DistinctTable(resultSchema, Collections.emptyList())), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+            desiredSchema);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -353,11 +376,42 @@ public class LeafStageTransferableBlockOperatorTest {
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator =
-        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+        new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+            getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+            desiredSchema);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
     Assert.assertEquals(resultBlock.getContainer().size(), 0);
     Assert.assertEquals(resultBlock.getDataSchema(), desiredSchema);
   }
+
+  @VisibleForTesting
+  static Function<ServerQueryRequest, InstanceResponseBlock> getStaticBlockProcessor(
+      List<InstanceResponseBlock> resultBlockList) {
+    return new StaticBlockProcessor(resultBlockList)::process;
+  }
+
+  static List<ServerQueryRequest> getStaticServerQueryRequests(int count) {
+    List<ServerQueryRequest> staticMockRequests = new ArrayList<>();
+    while (count > 0) {
+      staticMockRequests.add(mock(ServerQueryRequest.class));
+      count--;
+    }
+    return staticMockRequests;
+  }
+
+  private static class StaticBlockProcessor {
+    private final List<InstanceResponseBlock> _resultBlockList;
+    private int _currentIdx;
+
+    StaticBlockProcessor(List<InstanceResponseBlock> resultBlockList) {
+      _resultBlockList = resultBlockList;
+      _currentIdx = 0;
+    }
+
+    public InstanceResponseBlock process(ServerQueryRequest request) {
+      return _resultBlockList.get(_currentIdx++);
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 3ec9fc71de..585e884e65 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -57,6 +58,7 @@ import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -286,8 +288,9 @@ public class OpChainTest {
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
-    LeafStageTransferableBlockOperator leafOp =
-        new LeafStageTransferableBlockOperator(context, resultsBlockList, upStreamSchema);
+    LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context,
+            LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
+            Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
 
     //Transform operator
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org