You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/15 19:22:38 UTC

[pinot] branch master updated: [multistage] implement naive round robin operator chain scheduling (#9753)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 342b6a57b5 [multistage] implement naive round robin operator chain scheduling (#9753)
342b6a57b5 is described below

commit 342b6a57b510afb7181704c772179b0bb5083eac
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Tue Nov 15 11:22:32 2022 -0800

    [multistage] implement naive round robin operator chain scheduling (#9753)
    
    This is a follow-up to #9711 and follows the design outlined in [this design doc](https://docs.google.com/document/d/1XAMHAlhFbINvX-kK1ANlzbRz4_RkS0map4qhqs1yDtE/edit#heading=h.de4smgkh3bzk).
    
    This PR implements a round robin operator chain scheduling algorithm and sets up the interface for future PRs that will implement more advanced scheduling. As of this PR, we can be guaranteed that all queries will make progress (see the change in `pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java`, you can now run it under situations with only 2 cores available) but the algorithm is still very hungry for CPU (queries with nothing in  [...]
---
 .../integration/tests/SSBQueryIntegrationTest.java |   6 -
 .../apache/pinot/query/runtime/QueryRunner.java    |  21 +-
 .../query/runtime/executor/OpChainScheduler.java   |  57 +++++
 .../runtime/executor/OpChainSchedulerService.java  | 147 +++++++++++++
 .../runtime/executor/RoundRobinScheduler.java      |  55 +++++
 .../runtime/executor/WorkerQueryExecutor.java      |  94 --------
 .../runtime/operator/MailboxReceiveOperator.java   |   9 +
 .../pinot/query/runtime/operator/OpChain.java      |  53 +++++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  11 +-
 .../apache/pinot/query/service/QueryServer.java    |  24 ++-
 .../apache/pinot/query/QueryServerEnclosure.java   |  15 +-
 .../executor/OpChainSchedulerServiceTest.java      | 240 +++++++++++++++++++++
 .../runtime/executor/RoundRobinSchedulerTest.java  |  43 ++++
 .../pinot/query/service/QueryServerTest.java       |   4 +-
 14 files changed, 652 insertions(+), 127 deletions(-)

diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java
index eace5b0629..5776f2c82b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java
@@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -54,7 +53,6 @@ import org.yaml.snakeyaml.Yaml;
 
 public class SSBQueryIntegrationTest extends BaseClusterIntegrationTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(SSBQueryIntegrationTest.class);
-  private static final int MIN_AVAILABLE_CORE_REQUIREMENT = 4;
   private static final Map<String, String> SSB_QUICKSTART_TABLE_RESOURCES = ImmutableMap.of(
       "customer", "examples/batch/ssb/customer",
       "dates", "examples/batch/ssb/dates",
@@ -66,10 +64,6 @@ public class SSBQueryIntegrationTest extends BaseClusterIntegrationTest {
   @BeforeClass
   public void setUp()
       throws Exception {
-    if (Runtime.getRuntime().availableProcessors() < MIN_AVAILABLE_CORE_REQUIREMENT) {
-      throw new SkipException("Skip SSB query testing. Insufficient core count: "
-          + Runtime.getRuntime().availableProcessors());
-    }
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     // Start the Pinot cluster
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 8c0a959c7d..cd689be880 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -50,11 +50,15 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.service.QueryConfig;
@@ -76,7 +80,6 @@ public class QueryRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
   // This is a temporary before merging the 2 type of executor.
   private ServerQueryExecutorV1Impl _serverExecutor;
-  private WorkerQueryExecutor _workerExecutor;
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
   private MailboxService<TransferableBlock> _mailboxService;
@@ -98,8 +101,6 @@ public class QueryRunner {
       _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config, instanceDataManager, serverMetrics);
-      _workerExecutor = new WorkerQueryExecutor();
-      _workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -109,16 +110,14 @@ public class QueryRunner {
     _helixPropertyStore = _helixManager.getHelixPropertyStore();
     _mailboxService.start();
     _serverExecutor.start();
-    _workerExecutor.start();
   }
 
   public void shutDown() {
-    _workerExecutor.shutDown();
     _serverExecutor.shutDown();
     _mailboxService.shutdown();
   }
 
-  public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorService executorService,
+  public void processQuery(DistributedStagePlan distributedStagePlan, OpChainSchedulerService scheduler,
       Map<String, String> requestMetadataMap) {
     if (isLeafStage(distributedStagePlan)) {
       // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table
@@ -132,7 +131,7 @@ public class QueryRunner {
       for (ServerPlanRequestContext requestContext : serverQueryRequests) {
         ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
-        serverQueryResults.add(processServerQuery(request, executorService));
+        serverQueryResults.add(processServerQuery(request, scheduler.getWorkerPool()));
       }
 
       MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
@@ -148,7 +147,11 @@ public class QueryRunner {
         LOGGER.debug("Acquired transferable block: {}", blockCounter++);
       }
     } else {
-      _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService);
+      long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
+      StageNode stageRoot = distributedStagePlan.getStageRoot();
+      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(
+          _mailboxService, requestId, stageRoot.getStageId(), _hostname, _port, distributedStagePlan.getMetadataMap()));
+      scheduler.register(rootOperator);
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
new file mode 100644
index 0000000000..4a34e49346
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
+/**
+ * An interface that defines different scheduling strategies to work with the
+ * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed
+ * to never be called concurrently - therefore all implementations may use data
+ * structures that are not concurrent.
+ */
+public interface OpChainScheduler {
+
+  /**
+   * @param operatorChain the operator chain to register
+   */
+  void register(OpChain operatorChain);
+
+  /**
+   * This method is called whenever {@code mailbox} has new data available to consume,
+   * this can be useful for advanced scheduling algorithms
+   *
+   * @param mailbox the mailbox ID
+   */
+  void onDataAvailable(MailboxIdentifier mailbox);
+
+  /**
+   * @return whether or not there is any work for the scheduler to do
+   */
+  boolean hasNext();
+
+  /**
+   * @return the next operator chain to process
+   * @throws java.util.NoSuchElementException if {@link #hasNext()} returns false
+   *         prior to this call
+   */
+  OpChain next();
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
new file mode 100644
index 0000000000..87c44774d3
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }
+          }
+        });
+      } finally {
+        _monitor.leave();
+      }
+    }
+  }
+
+  /**
+   * Register a new operator chain with the scheduler.
+   *
+   * @param operatorChain the chain to register
+   */
+  public final void register(OpChain operatorChain) {
+    _monitor.enter();
+    try {
+      _scheduler.register(operatorChain);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  /**
+   * This method should be called whenever data is available in a given mailbox.
+   * Implementations of this method should be idempotent, it may be called in the
+   * scenario that no mail is available.
+   *
+   * @param mailbox the identifier of the mailbox that now has data
+   */
+  public final void onDataAvailable(MailboxIdentifier mailbox) {
+    _monitor.enter();
+    try {
+      _scheduler.onDataAvailable(mailbox);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  // TODO: remove this method after we pipe down the proper executor pool to the v1 engine
+  public ExecutorService getWorkerPool() {
+    return _workerPool;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
new file mode 100644
index 0000000000..ff9c762536
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RoundRobinScheduler implements OpChainScheduler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class);
+
+  private final Queue<OpChain> _opChainQueue = new LinkedList<>();
+
+  @Override
+  public void register(OpChain operatorChain) {
+    _opChainQueue.add(operatorChain);
+  }
+
+  @Override
+  public void onDataAvailable(MailboxIdentifier mailbox) {
+    // do nothing - this doesn't change order of execution
+  }
+
+  @Override
+  public boolean hasNext() {
+    // don't use _nextOpChain.hasNext() because that may potentially create
+    // a new iterator that gets tossed
+    return !_opChainQueue.isEmpty();
+  }
+
+  @Override
+  public OpChain next() {
+    return _opChainQueue.poll();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
deleted file mode 100644
index 18bb2defa5..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.executor;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.request.context.ThreadTimer;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.util.trace.TraceRunnable;
-import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * WorkerQueryExecutor is the v2 of the {@link org.apache.pinot.core.query.executor.QueryExecutor} API.
- *
- * It provides not only execution interface for {@link org.apache.pinot.core.query.request.ServerQueryRequest} but
- * also a more general {@link DistributedStagePlan}.
- */
-public class WorkerQueryExecutor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class);
-  private PinotConfiguration _config;
-  private ServerMetrics _serverMetrics;
-  private MailboxService<TransferableBlock> _mailboxService;
-  private String _hostName;
-  private int _port;
-
-  public void init(PinotConfiguration config, ServerMetrics serverMetrics,
-      MailboxService<TransferableBlock> mailboxService, String hostName, int port) {
-    _config = config;
-    _serverMetrics = serverMetrics;
-    _mailboxService = mailboxService;
-    _hostName = hostName;
-    _port = port;
-  }
-
-
-  public synchronized void start() {
-    LOGGER.info("Worker query executor started");
-  }
-
-  public synchronized void shutDown() {
-    LOGGER.info("Worker query executor shut down");
-  }
-
-  public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
-      ExecutorService executorService) {
-    long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
-    StageNode stageRoot = queryRequest.getStageRoot();
-
-    Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(
-        _mailboxService, requestId, stageRoot.getStageId(), _hostName, _port, queryRequest.getMetadataMap()));
-
-    executorService.submit(new TraceRunnable() {
-      @Override
-      public void runJob() {
-        try {
-          ThreadTimer executionThreadTimer = new ThreadTimer();
-          while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
-            LOGGER.debug("Result Block acquired");
-          }
-          LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
-        } catch (Exception e) {
-          LOGGER.error("Failed to execute query!", e);
-        }
-      }
-    });
-  }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 18fb47a08a..35fd5ba343 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.exception.QueryException;
@@ -52,6 +54,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private final RelDistribution.Type _exchangeType;
   private final KeySelector<Object[], Object[]> _keySelector;
   private final List<ServerInstance> _sendingStageInstances;
+  private final List<MailboxIdentifier> _sendingMailboxes;
   private final DataSchema _dataSchema;
   private final String _hostName;
   private final int _port;
@@ -96,6 +99,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
     _upstreamErrorBlock = null;
     _keySelector = keySelector;
     _serverIdx = 0;
+
+    _sendingMailboxes = _sendingStageInstances.stream().map(this::toMailboxId).collect(Collectors.toList());
   }
 
   @Override
@@ -170,4 +175,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
     return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), serverInstance.getHostname(),
         serverInstance.getQueryMailboxPort(), _hostName, _port);
   }
+
+  public Collection<MailboxIdentifier> getSendingMailboxes() {
+    return _sendingMailboxes;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
new file mode 100644
index 0000000000..1fa9277b96
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root) {
+    _root = root;
+
+    // use memoized supplier so that the timing doesn't start until the
+    // first time we get the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {
+    return _root;
+  }
+
+  public ThreadTimer getAndStartTimer() {
+    return _timer.get();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 32e2d7ddf8..f3536af6f6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -36,12 +35,14 @@ import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
 
@@ -53,13 +54,15 @@ import org.apache.pinot.query.runtime.operator.TransformOperator;
  *
  * <p>This class should be used statically via {@link #build(StageNode, PlanRequestContext)}
  *
- * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, ExecutorService, Map)
+ * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, OpChainSchedulerService, Map)
  */
 public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> {
+
   private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
 
-  public static Operator<TransferableBlock> build(StageNode node, PlanRequestContext context) {
-    return node.visit(INSTANCE, context);
+  public static OpChain build(StageNode node, PlanRequestContext context) {
+    Operator<TransferableBlock> root = node.visit(INSTANCE, context);
+    return new OpChain(root);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index 0a74ee820b..197338190c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -25,7 +25,6 @@ import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
@@ -33,6 +32,8 @@ import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.slf4j.Logger;
@@ -42,18 +43,22 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}.
  */
+@SuppressWarnings("UnstableApiUsage")
 public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
 
   private final Server _server;
   private final QueryRunner _queryRunner;
-  private final ExecutorService _executorService;
+  private final OpChainSchedulerService _scheduler;
 
   public QueryServer(int port, QueryRunner queryRunner) {
     _server = ServerBuilder.forPort(port).addService(this).build();
-    _executorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
-        new NamedThreadFactory("query_worker_on_" + port + "_port"));
+    _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(),
+        Executors.newFixedThreadPool(
+            ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+            new NamedThreadFactory("query_worker_on_" + port + "_port")));
     _queryRunner = queryRunner;
+
     LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port,
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
@@ -61,6 +66,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   public void start() {
     LOGGER.info("Starting QueryWorker");
     try {
+      _scheduler.startAsync().awaitRunning();
       _queryRunner.start();
       _server.start();
     } catch (IOException e) {
@@ -70,9 +76,13 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   public void shutdown() {
     LOGGER.info("Shutting down QueryWorker");
+    _queryRunner.shutDown();
+    _scheduler.stopAsync();
+    _server.shutdown();
+
     try {
-      _queryRunner.shutDown();
-      _server.shutdown().awaitTermination();
+      _scheduler.awaitTerminated();
+      _server.awaitTermination();
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
@@ -105,7 +115,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       // Process the query
       try {
         // TODO: break this into parsing and execution, so that responseObserver can return upon parsing complete.
-        _queryRunner.processQuery(distributedStagePlan, _executorService, requestMetadataMap);
+        _queryRunner.processQuery(distributedStagePlan, _scheduler, requestMetadataMap);
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing request", e);
         throw new RuntimeException(e);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 9481c4db61..542727911d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -30,6 +29,8 @@ import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
@@ -63,7 +64,7 @@ public class QueryServerEnclosure {
   private static final String TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMAS_PREFIX = "/SCHEMAS/";
 
-  private final ExecutorService _testExecutor;
+  private final OpChainSchedulerService _scheduler;
   private final int _queryRunnerPort;
   private final Map<String, Object> _runnerConfig = new HashMap<>();
   private final InstanceDataManager _instanceDataManager;
@@ -80,8 +81,10 @@ public class QueryServerEnclosure {
       _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
           String.format("Server_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME));
       _queryRunner = new QueryRunner();
-      _testExecutor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM,
-          new NamedThreadFactory("test_query_server_enclosure_on_" + _queryRunnerPort + "_port"));
+      _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(),
+          Executors.newFixedThreadPool(
+              DEFAULT_EXECUTOR_THREAD_NUM,
+              new NamedThreadFactory("test_query_server_enclosure_on_" + _queryRunnerPort + "_port")));
     } catch (Exception e) {
       throw new RuntimeException("Test Failed!", e);
     }
@@ -120,13 +123,15 @@ public class QueryServerEnclosure {
     _queryRunner = new QueryRunner();
     _queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
     _queryRunner.start();
+    _scheduler.startAsync().awaitRunning();
   }
 
   public void shutDown() {
     _queryRunner.shutDown();
+    _scheduler.stopAsync().awaitTerminated();
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
-    _queryRunner.processQuery(distributedStagePlan, _testExecutor, requestMetadataMap);
+    _queryRunner.processQuery(distributedStagePlan, _scheduler, requestMetadataMap);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
new file mode 100644
index 0000000000..7cd1f48e10
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class OpChainSchedulerServiceTest {
+
+  private ExecutorService _executor;
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _operatorA;
+  @Mock
+  private Operator<TransferableBlock> _operatorB;
+  @Mock
+  private OpChainScheduler _scheduler;
+
+  @BeforeClass
+  public void beforeClass() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterClass
+  public void afterClass()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    _executor.shutdownNow();
+  }
+
+  private void initExecutor(int numThreads) {
+    _executor = Executors.newFixedThreadPool(numThreads);
+  }
+
+  private OpChain getChain(Operator<TransferableBlock> operator) {
+    return new OpChain(operator);
+  }
+
+  @Test
+  public void shouldScheduleSingleOpChainRegisteredAfterStart()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    Mockito.when(_scheduler.hasNext()).thenReturn(true);
+    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+      latch.countDown();
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+    scheduler.register(new OpChain(_operatorA));
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
+    scheduler.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void shouldScheduleSingleOpChainRegisteredBeforeStart()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    Mockito.when(_scheduler.hasNext()).thenReturn(true);
+    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+      latch.countDown();
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    // When:
+    scheduler.register(new OpChain(_operatorA));
+    scheduler.startAsync().awaitRunning();
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
+    scheduler.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void shouldReRegisterOpChainOnNoOpBlock()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    Mockito.when(_scheduler.hasNext()).thenReturn(true);
+    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock())
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+        .thenAnswer(inv -> {
+          latch.countDown();
+          return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+        });
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+    scheduler.register(new OpChain(_operatorA));
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
+    Mockito.verify(_scheduler, Mockito.times(2)).register(Mockito.any(OpChain.class));
+    scheduler.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void shouldYieldOpChainsWhenNoWorkCanBeDone()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    Mockito.when(_scheduler.hasNext()).thenReturn(true);
+    Mockito.when(_scheduler.next())
+        .thenReturn(getChain(_operatorA))
+        .thenReturn(getChain(_operatorB))
+        .thenReturn(getChain(_operatorA));
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    AtomicBoolean opAReturnedNoOp = new AtomicBoolean(false);
+    AtomicBoolean hasOpBRan = new AtomicBoolean(false);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+      if (hasOpBRan.get()) {
+        latch.countDown();
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      } else {
+        opAReturnedNoOp.set(true);
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+    });
+
+    Mockito.when(_operatorB.nextBlock()).thenAnswer(inv -> {
+      hasOpBRan.set(true);
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+    scheduler.register(new OpChain(_operatorA));
+    scheduler.register(new OpChain(_operatorB));
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
+    Assert.assertTrue(opAReturnedNoOp.get(), "expected opA to be scheduled first");
+    scheduler.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void shouldNotCallSchedulerNextWhenHasNextReturnsFalse()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
+      latch.countDown();
+      return false;
+    });
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
+    scheduler.stopAsync().awaitTerminated();
+    Mockito.verify(_scheduler, Mockito.never()).next();
+  }
+
+  @Test
+  public void shouldReevaluateHasNextWhenOnDataAvailableIsCalled()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    CountDownLatch firstHasNext = new CountDownLatch(1);
+    CountDownLatch secondHasNext = new CountDownLatch(1);
+    Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
+      firstHasNext.countDown();
+      return false;
+    }).then(inv -> {
+      secondHasNext.countDown();
+      return false;
+    });
+
+    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+    Assert.assertTrue(firstHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
+    scheduler.onDataAvailable(null);
+
+    // Then:
+    Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called again");
+    scheduler.stopAsync().awaitTerminated();
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
new file mode 100644
index 0000000000..0c92403656
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RoundRobinSchedulerTest {
+
+  @Test
+  public void shouldPollOperators() {
+    // Given:
+    OpChain opChain = Mockito.mock(OpChain.class);
+    RoundRobinScheduler scheduler = new RoundRobinScheduler();
+
+    // When:
+    scheduler.register(opChain);
+
+    // Then:
+    Assert.assertTrue(scheduler.hasNext(), "expected next");
+    Assert.assertEquals(scheduler.next(), opChain);
+    Assert.assertFalse(scheduler.hasNext(), "should no longer have next after polling");
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 0ca5cc656a..b1d8edecec 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -38,6 +37,7 @@ import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.util.TestUtils;
@@ -119,7 +119,7 @@ public class QueryServerTest extends QueryTestSet {
               StageNode stageNode = queryPlan.getQueryStageMap().get(stageId);
               return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot())
                   && isMetadataMapsEqual(stageMetadata, distributedStagePlan.getMetadataMap().get(stageId));
-            }), any(ExecutorService.class), Mockito.argThat(requestMetadataMap ->
+            }), any(OpChainSchedulerService.class), Mockito.argThat(requestMetadataMap ->
                 requestIdStr.equals(requestMetadataMap.get("REQUEST_ID"))));
             return true;
           } catch (Throwable t) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org