You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/22 03:50:54 UTC

[pinot] branch master updated: [multistage] Fix Issues with OpChainScheduler (#10289)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa1368a1a [multistage] Fix Issues with OpChainScheduler (#10289)
1aa1368a1a is described below

commit 1aa1368a1a4f6467a2e49866f3de51f4d519942d
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Wed Feb 22 09:20:47 2023 +0530

    [multistage] Fix Issues with OpChainScheduler (#10289)
    
    * [multistage] Fix Issues with OpChainScheduler
    
    * Remove _running
    
    * Add UTs
    
    * Add support for parallelism > 1 + fix tests + address comments
    
    * Add Nullable annotation to the interface
    
    * Address feedback
    
    * Add a state transition diagram
    
    * Prevent race-condition in Stopwatch
    
    * Fix OpChainStats unintended changes + typo in previous commit
---
 .../apache/pinot/query/runtime/QueryRunner.java    |   2 +-
 .../query/runtime/executor/OpChainScheduler.java   |  61 ++--
 .../runtime/executor/OpChainSchedulerService.java  | 177 +++++-------
 .../runtime/executor/RoundRobinScheduler.java      | 315 ++++++++++++---------
 .../pinot/query/runtime/operator/OpChain.java      |  13 +-
 .../pinot/query/runtime/operator/OpChainId.java    |  57 ++++
 .../pinot/query/runtime/operator/OpChainStats.java |  18 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   3 +-
 .../apache/pinot/query/service/QueryConfig.java    |   2 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   3 +-
 .../executor/OpChainSchedulerServiceTest.java      | 160 ++++-------
 .../runtime/executor/RoundRobinSchedulerTest.java  | 212 +++++++-------
 12 files changed, 538 insertions(+), 485 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 0a18ac555a..d171336a52 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -106,7 +106,7 @@ public class QueryRunner {
       _executorService = Executors.newFixedThreadPool(
           ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("query_worker_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _executorService, releaseMs);
+      _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _executorService);
       _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config, _scheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index 635e5aacfb..0f522076e2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -18,52 +18,69 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.OpChain;
 
 
 /**
- * An interface that defines different scheduling strategies to work with the
- * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed
- * to never be called concurrently - therefore all implementations may use data
- * structures that are not concurrent.
+ * An interface that defines different scheduling strategies to work with the {@link OpChainSchedulerService}.
  */
+@ThreadSafe
 public interface OpChainScheduler {
-
   /**
+   * Registers a new OpChain with the scheduler.
    * @param operatorChain the operator chain to register
-   * @param isNew         whether or not this is the first time the operator is scheduled
    */
-  void register(OpChain operatorChain, boolean isNew);
+  void register(OpChain operatorChain);
 
   /**
-   * This method is called whenever {@code mailbox} has new data available to consume,
-   * this can be useful for advanced scheduling algorithms
-   *
-   * @param mailbox the mailbox ID
+   * When the OpChain is finished, error or otherwise, deregister is called for it so the scheduler can do any required
+   * cleanup. After an OpChain is de-registered, the scheduler service will never call any other method for it.
+   * However, the {@link #onDataAvailable} callback may be called even after an OpChain is de-registered, and the
+   * scheduler should handle that scenario.
+   * @param operatorChain an operator chain that is finished (error or otherwise).
    */
-  void onDataAvailable(MailboxIdentifier mailbox);
+  void deregister(OpChain operatorChain);
 
   /**
-   * This method is called when scheduler is terminating. It should clean up all of the resources if there are any.
-   * register() and onDataAvailable() shouldn't be called anymore after shutDown is called.
+   * Used by {@link OpChainSchedulerService} to indicate that a given OpChain can be suspended until it receives some
+   * data. Note that this method is only used by the scheduler service to "indicate" that an OpChain can be suspended.
+   * The decision on whether to actually suspend or not can be taken by the scheduler.
    */
-  void shutDown();
+  void yield(OpChain opChain);
 
   /**
-   * @return whether or not there is any work for the scheduler to do
+   * A callback called whenever data is received for the given mailbox. This can be used by the scheduler
+   * implementations to re-scheduled suspended OpChains. This method may be called for an OpChain that has not yet
+   * been scheduled, or an OpChain that has already been de-registered.
+   * @param mailbox the mailbox ID
    */
-  boolean hasNext();
+  void onDataAvailable(MailboxIdentifier mailbox);
 
   /**
-   * @return the next operator chain to process
-   * @throws java.util.NoSuchElementException if {@link #hasNext()} returns false
-   *         prior to this call
+   * Returns an OpChain that is ready to be run by {@link OpChainSchedulerService}, waiting for the given time if
+   * there are no such OpChains ready yet. Will return null if there's no ready OpChains even after the specified time.
+   *
+   * @param time non-negative value that determines the time the scheduler will wait for new OpChains to be ready.
+   * @param timeUnit TimeUnit for the await time.
+   * @return a non-null OpChain that's ready to be run, or null if there's no OpChain ready even after waiting for the
+   *         given time.
+   * @throws InterruptedException if the wait for a ready OpChain was interrupted.
    */
-  OpChain next();
+  @Nullable
+  OpChain next(long time, TimeUnit timeUnit)
+      throws InterruptedException;
 
   /**
-   * @return the number of operator chains that are awaiting execution
+   * @return the number of operator chains registered with the scheduler
    */
   int size();
+
+  /**
+   * TODO: Figure out shutdown flow in context of graceful shutdown.
+   */
+  void shutdownNow();
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index cfc3959c93..5c3e9fa29f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.util.trace.TraceRunnable;
@@ -35,116 +33,78 @@ import org.slf4j.LoggerFactory;
  * This class provides the implementation for scheduling multistage queries on a single node based
  * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
  * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
- * chain ({@link OpChainScheduler#next()} will be requested.
- *
- * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ * chain ({@link OpChainScheduler#next} will be requested.
  */
 @SuppressWarnings("UnstableApiUsage")
 public class OpChainSchedulerService extends AbstractExecutionThreadService {
-
   private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
-
-  private static final int TERMINATION_TIMEOUT_SEC = 60;
+  // Default time scheduler is allowed to wait for a runnable OpChain to be available
+  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
 
   private final OpChainScheduler _scheduler;
   private final ExecutorService _workerPool;
-  private final long _pollIntervalMs;
-
-  // anything that is guarded by this monitor should be non-blocking
-  private final Monitor _monitor = new Monitor();
-  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
-    @Override
-    public boolean isSatisfied() {
-      return _scheduler.hasNext() || !isRunning();
-    }
-  };
 
-  // Note that workerPool is shut down in this class.
   public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
-    this(scheduler, workerPool, -1);
-  }
-
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long pollIntervalMs) {
     _scheduler = scheduler;
     _workerPool = workerPool;
-    _pollIntervalMs = pollIntervalMs;
   }
 
   @Override
   protected void triggerShutdown() {
+    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
     LOGGER.info("Triggered shutdown on OpChainScheduler...");
-    // this will just notify all waiters that the scheduler is shutting down
-    _monitor.enter();
-    _monitor.leave();
-    if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool, TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
-      LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
-    }
-    _scheduler.shutDown();
   }
 
   @Override
   protected void run()
       throws Exception {
     while (isRunning()) {
-      if (enterMonitor()) {
-        try {
-          if (!isRunning()) {
-            return;
-          }
-
-          OpChain operatorChain = _scheduler.next();
-          LOGGER.trace("({}): Scheduling", operatorChain);
-          _workerPool.submit(new TraceRunnable() {
-            @Override
-            public void runJob() {
-              try {
-                LOGGER.trace("({}): Executing", operatorChain);
-                operatorChain.getStats().executing();
-
-                // so long as there's work to be done, keep getting the next block
-                // when the operator chain returns a NOOP block, then yield the execution
-                // of this to another worker
-                TransferableBlock result = operatorChain.getRoot().nextBlock();
-                while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
-                  result = operatorChain.getRoot().nextBlock();
-                }
+      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, TimeUnit.MILLISECONDS);
+      if (operatorChain == null) {
+        continue;
+      }
+      LOGGER.trace("({}): Scheduling", operatorChain);
+      _workerPool.submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          boolean isFinished = false;
+          Throwable thrown = null;
+          try {
+            LOGGER.trace("({}): Executing", operatorChain);
+            operatorChain.getStats().executing();
+
+            // so long as there's work to be done, keep getting the next block
+            // when the operator chain returns a NOOP block, then yield the execution
+            // of this to another worker
+            TransferableBlock result = operatorChain.getRoot().nextBlock();
+            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+              result = operatorChain.getRoot().nextBlock();
+            }
 
-                if (!result.isEndOfStreamBlock()) {
-                  // not complete, needs to re-register for scheduling
-                  register(operatorChain, false);
-                } else {
-                  if (result.isErrorBlock()) {
-                    operatorChain.getRoot().toExplainString();
-                    LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
-                        result.getDataBlock().getExceptions());
-                  } else {
-                    operatorChain.getRoot().toExplainString();
-                    operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
-                    LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
-                  }
-                  operatorChain.close();
-                }
-              } catch (Exception e) {
-                operatorChain.close();
-                operatorChain.getRoot().toExplainString();
-                LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
-              }
+            if (!result.isEndOfStreamBlock()) {
+              // TODO: There should be a waiting-for-data state in OpChainStats.
+              operatorChain.getStats().queued();
+              _scheduler.yield(operatorChain);
+            } else if (result.isEndOfStreamBlock()) {
+              isFinished = true;
+              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
+                  result.getDataBlock().getExceptions());
+            } else {
+              operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+              LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
             }
-          });
-        } finally {
-          _monitor.leave();
+          } catch (Exception e) {
+            LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
+            thrown = e;
+          } finally {
+            if (isFinished) {
+              closeOpChain(operatorChain);
+            } else if (thrown != null) {
+              cancelOpChain(operatorChain, thrown);
+            }
+          }
         }
-      }
-    }
-  }
-
-  private boolean enterMonitor()
-      throws InterruptedException {
-    if (_pollIntervalMs >= 0) {
-      return _monitor.enterWhen(_hasNextOrClosing, _pollIntervalMs, TimeUnit.MILLISECONDS);
-    } else {
-      _monitor.enterWhen(_hasNextOrClosing);
-      return true;
+      });
     }
   }
 
@@ -154,24 +114,15 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
    * @param operatorChain the chain to register
    */
   public final void register(OpChain operatorChain) {
-    register(operatorChain, true);
+    operatorChain.getStats().queued();
+    _scheduler.register(operatorChain);
     LOGGER.debug("({}): Scheduler is now handling operator chain listening to mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", operatorChain, operatorChain.getReceivingMailbox(),
+            + "There are a total of {} chains awaiting execution.",
+        operatorChain,
+        operatorChain.getReceivingMailbox(),
         _scheduler.size());
   }
 
-  public final void register(OpChain operatorChain, boolean isNew) {
-    _monitor.enter();
-    try {
-      LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", operatorChain, isNew, _scheduler.size());
-
-      _scheduler.register(operatorChain, isNew);
-    } finally {
-      operatorChain.getStats().queued();
-      _monitor.leave();
-    }
-  }
-
   /**
    * This method should be called whenever data is available in a given mailbox.
    * Implementations of this method should be idempotent, it may be called in the
@@ -180,17 +131,27 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
    * @param mailbox the identifier of the mailbox that now has data
    */
   public final void onDataAvailable(MailboxIdentifier mailbox) {
-    _monitor.enter();
-    try {
-      LOGGER.trace("Notified onDataAvailable for mailbox {}", mailbox);
-      _scheduler.onDataAvailable(mailbox);
-    } finally {
-      _monitor.leave();
-    }
+    _scheduler.onDataAvailable(mailbox);
   }
 
   // TODO: remove this method after we pipe down the proper executor pool to the v1 engine
   public ExecutorService getWorkerPool() {
     return _workerPool;
   }
+
+  private void closeOpChain(OpChain opChain) {
+    try {
+      opChain.close();
+    } finally {
+      _scheduler.deregister(opChain);
+    }
+  }
+
+  private void cancelOpChain(OpChain opChain, Throwable e) {
+    try {
+      opChain.cancel(e);
+    } finally {
+      _scheduler.deregister(opChain);
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index c7b77f85e7..863e403c04 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -19,17 +19,25 @@
 package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
-import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,168 +47,221 @@ import org.slf4j.LoggerFactory;
  * but will only schedule them when there is work to be done. The availability
  * of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
  * callback.
+ * <p>
+ *   Design: There are 3 states for a OpChain:
+ *
+ *   1. Ready: This state means that an OpChain is ready to be run. If an OpChain is in this state, then it would be in
+ *             the _ready queue. The _ready queue is polled on every call to {@link #next}.
+ *   2. Available/Yielded: This is when the OpChain was run and it returned a no-op block, indicating that it has no
+ *                         new data to process. The {@link OpChainId} for these OpChains are stored in the _available
+ *                         map, where the value of the map is the release-timeout.
+ *   3. Running: When the OpChain is returned by the {@link #next} method, it is considered to be in the Running state.
+ *
+ *   The following state transitions are possible:
+ *   1. Ready ==> Running: This happens when the OpChain is returned by the {@link #next} method.
+ *   2. Running ==> Available/Yielded: This happens when a running OpChain returns a No-op block, following which a
+ *                                     yield is called and there's no entry for the OpChain in _seenMail.
+ *   3. Available/Yielded ==> Ready: This can happen in two cases: (1) When yield is called but _seenMail has an
+ *                                   entry for the corresponding OpChainId, which means there was some data received
+ *                                   by MailboxReceiveOperator after the last poll. (2) When a sender has died or hasn't
+ *                                   sent any data in the last _releaseTimeoutMs milliseconds.
+ *
+ *                      |--------------( #yield() )---------|
+ *                      |                                   |
+ *                     \/                                   |
+ *   [ START ] --> [ READY ] -----------( #next() )--> [ RUNNING ]
+ *                     /\                                   |
+ *                     |                                    |
+ *         ( #seenMail() or #periodic())                    |
+ *                     |                                    |
+ *                [ AVAILABLE ] <------( #yield() )---------|
+ *                                                          |
+ *                                                          |
+ *   [ EXIT ] <-----------------( #deregistered() )---------|
+ *
+ *   The OpChain is considered "alive" from the time it is registered until it is de-registered. Any reference to the
+ *   OpChain or its related metadata is kept only while the OpChain is alive. The {@link #onDataAvailable} callback
+ *   can be called before an OpChain was ever registered. In that case, this scheduler will simply ignore the callback,
+ *   since once the OpChain gets registered it will anyways be put into the _ready queue immediately. In case the
+ *   OpChain never gets registered (e.g. if the broker couldn't dispatch it), as long as the sender cleans up all
+ *   resources that it has acquired, there will be no leak, since the scheduler doesn't hold any references for
+ *   non-alive OpChains.
+ * </p>
  */
-@NotThreadSafe
+@ThreadSafe
 public class RoundRobinScheduler implements OpChainScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class);
-  private static final long DEFAULT_RELEASE_TIMEOUT = TimeUnit.MINUTES.toMillis(1);
+  private static final String AVAILABLE_RELEASE_THREAD_NAME = "round-robin-scheduler-release-thread";
 
-  private final long _releaseTimeout;
+  private final long _releaseTimeoutMs;
   private final Supplier<Long> _ticker;
 
-  // the _available queue contains operator chains that are available
-  // to this scheduler but do not have any data available to schedule
-  // while the _ready queue contains the operator chains that are ready
-  // to be scheduled (have data, or are first-time scheduled)
-  private final Queue<AvailableEntry> _available = new LinkedList<>();
-  private final Queue<OpChain> _ready = new LinkedList<>();
-
-  private boolean _isShutDown = false;
-
-  // using a Set here is acceptable because calling hasNext() and
-  // onDataAvailable() cannot occur concurrently - that means that
-  // anytime we schedule a new operator based on the presence of
-  // mail we can be certain that it will consume all of the mail
-  // form that mailbox, even if there are multiple items in it. If,
-  // during execution of that operator, more mail appears, then the
-  // operator will be rescheduled immediately potentially resulting
-  // in a false-positive schedule
-  @VisibleForTesting
-  final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+  private final Map<OpChainId, OpChain> _aliveChains = new ConcurrentHashMap<>();
+  final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+  private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
 
-  public RoundRobinScheduler() {
-    this(DEFAULT_RELEASE_TIMEOUT);
-  }
+  private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
 
-  public RoundRobinScheduler(long releaseTimeout) {
-    this(releaseTimeout, System::currentTimeMillis);
+  private final Lock _lock = new ReentrantLock();
+  private final ScheduledExecutorService _availableOpChainReleaseService;
+
+  public RoundRobinScheduler(long releaseTimeoutMs) {
+    this(releaseTimeoutMs, System::currentTimeMillis);
   }
 
-  public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
-    _releaseTimeout = releaseTimeoutMs;
+  RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+    Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for round-robin scheduler should be > 0ms");
+    _releaseTimeoutMs = releaseTimeoutMs;
     _ticker = ticker;
+    _availableOpChainReleaseService = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r);
+      t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+      t.setDaemon(true);
+      return t;
+    });
+    _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+      List<OpChainId> timedOutWaiting = new ArrayList<>();
+      for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+        if (Thread.interrupted()) {
+          LOGGER.warn("Thread={} interrupted. Scheduler may be shutting down.", AVAILABLE_RELEASE_THREAD_NAME);
+          break;
+        }
+        if (_ticker.get() > entry.getValue()) {
+          timedOutWaiting.add(entry.getKey());
+        }
+      }
+      for (OpChainId opChainId : timedOutWaiting) {
+        _lock.lock();
+        try {
+          if (_available.containsKey(opChainId)) {
+            _available.remove(opChainId);
+            _ready.offer(_aliveChains.get(opChainId));
+          }
+        } finally {
+          _lock.unlock();
+        }
+      }
+    }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
   }
 
   @Override
-  public void register(OpChain operatorChain, boolean isNew) {
-    if (_isShutDown) {
-      return;
-    }
-    // the first time an operator chain is scheduled, it should
-    // immediately be considered ready in case it does not need
-    // read from any mailbox (e.g. with a LiteralValueOperator)
-    if (isNew) {
-      _ready.add(operatorChain);
-    } else {
-      long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() + _releaseTimeout;
-      _available.add(new AvailableEntry(operatorChain, releaseTs));
+  public void register(OpChain operatorChain) {
+    Preconditions.checkState(!_aliveChains.containsKey(operatorChain.getId()),
+        String.format("Tried to re-register op-chain: %s", operatorChain.getId()));
+    _lock.lock();
+    try {
+      _aliveChains.put(operatorChain.getId(), operatorChain);
+      _ready.offer(operatorChain);
+    } finally {
+      _lock.unlock();
     }
     trace("registered " + operatorChain);
   }
 
   @Override
-  public void onDataAvailable(MailboxIdentifier mailbox) {
-    // it may be possible to receive this callback when there's no corresponding
-    // operator chain registered to the mailbox - this can happen when either
-    // (1) we get the callback before the first register is called or (2) we get
-    // the callback while the operator chain is executing. to account for this,
-    // we just store it in a set of seen mail and only check for it when hasNext
-    // is called.
-    //
-    // note that scenario (2) may cause a false-positive schedule where an operator
-    // chain gets scheduled for mail that it had already processed, in which case
-    // the operator chain will simply do nothing and get put back onto the queue.
-    // scenario (2) may additionally cause a memory leak - if onDataAvailable is
-    // called with an EOS block _while_ the operator chain is executing, the chain
-    // will consume the EOS block and computeReady() will never remove the mailbox
-    // from the _seenMail set.
-    //
-    // TODO: fix the memory leak by adding a close(opChain) callback
-    _seenMail.add(mailbox);
-    trace("got mail for " + mailbox);
+  public void deregister(OpChain operatorChain) {
+    Preconditions.checkState(_aliveChains.containsKey(operatorChain.getId()),
+        "Tried to de-register an un-registered op-chain");
+    _lock.lock();
+    try {
+      OpChainId chainId = operatorChain.getId();
+      _aliveChains.remove(chainId);
+      // it could be that the onDataAvailable callback was called when the OpChain was executing, in which case there
+      // could be a dangling entry in _seenMail.
+      _seenMail.remove(chainId);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @Override
+  public void yield(OpChain operatorChain) {
+    long releaseTs = _ticker.get() + _releaseTimeoutMs;
+    _lock.lock();
+    try {
+      // It could be that this OpChain received data before it could be yielded completely. In that case, mark it ready
+      // to get it scheduled asap.
+      if (_seenMail.contains(operatorChain.getId())) {
+        _seenMail.remove(operatorChain.getId());
+        _ready.add(operatorChain);
+        return;
+      }
+      _available.put(operatorChain.getId(), releaseTs);
+    } finally {
+      _lock.unlock();
+    }
   }
 
   @Override
-  public boolean hasNext() {
-    if (!_ready.isEmpty()) {
-      return true;
+  public void onDataAvailable(MailboxIdentifier mailbox) {
+    // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+    OpChainId opChainId = new OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+        mailbox.getToHost().virtualId(), mailbox.getReceiverStageId());
+    // If this chain isn't alive as per the scheduler, don't do anything. If the OpChain is registered after this, it
+    // will anyways be scheduled to run since new OpChains are run immediately.
+    if (!_aliveChains.containsKey(opChainId)) {
+      trace("got mail, but the OpChain is not registered so ignoring the event " + mailbox);
+      return;
     }
-    computeReady();
-    return !_ready.isEmpty();
+    _lock.lock();
+    try {
+      if (!_aliveChains.containsKey(opChainId)) {
+        return;
+      }
+      if (_available.containsKey(opChainId)) {
+        _available.remove(opChainId);
+        _ready.offer(_aliveChains.get(opChainId));
+      } else {
+        // There are two cases here:
+        // 1. OpChain is in the _ready queue: the next time it gets polled, we'll remove the _seenMail entry.
+        // 2. OpChain is running: the next time yield is called for it, we'll check against _seenMail and put it back
+        //    in the _ready queue again.
+        _seenMail.add(opChainId);
+      }
+    } finally {
+      _lock.unlock();
+    }
+    trace("got mail for " + mailbox);
   }
 
   @Override
-  public OpChain next() {
-    OpChain op = _ready.poll();
-    trace("Polled " + op);
-    return op;
+  public OpChain next(long time, TimeUnit timeUnit) throws InterruptedException {
+    return _ready.poll(time, timeUnit);
   }
 
   @Override
   public int size() {
-    return _ready.size() + _available.size();
+    return _aliveChains.size();
   }
 
   @Override
-  public void shutDown() {
-    if (_isShutDown) {
-      return;
-    }
-    while (!_ready.isEmpty()) {
-      _ready.poll().close();
-    }
-    while (!_available.isEmpty()) {
-      _available.poll()._opChain.close();
-    }
-    _isShutDown = true;
+  public void shutdownNow() {
+    // TODO: Figure out shutdown flow in context of graceful shutdown.
+    _availableOpChainReleaseService.shutdownNow();
   }
 
-  private void computeReady() {
-    Iterator<AvailableEntry> availableChains = _available.iterator();
-
-    // the algorithm here iterates through all available chains and checks
-    // to see whether or not any of the available chains have seen mail for
-    // at least one of the mailboxes they receive from - if they do, then
-    // we should make that chain available and remove any mail from the
-    // mailboxes that it would consume from (after it is scheduled, all
-    // mail available to it will have been consumed).
-    while (availableChains.hasNext()) {
-      AvailableEntry chain = availableChains.next();
-      Sets.SetView<MailboxIdentifier> intersect = Sets.intersection(chain._opChain.getReceivingMailbox(), _seenMail);
-
-      if (!intersect.isEmpty()) {
-        // use an immutable copy because set views use the underlying sets
-        // directly, which would cause a concurrent modification exception
-        // when removing data from _seenMail
-        _seenMail.removeAll(intersect.immutableCopy());
-        _ready.add(chain._opChain);
-        availableChains.remove();
-      } else if (_ticker.get() > chain._releaseTs) {
-        LOGGER.warn("({}) Scheduling operator chain reading from {} after timeout. Ready: {}, Available: {}, Mail: {}.",
-            chain._opChain, chain._opChain.getReceivingMailbox(), _ready, _available, _seenMail);
-        _ready.add(chain._opChain);
-        availableChains.remove();
-      }
-    }
+  @VisibleForTesting
+  int readySize() {
+    return _ready.size();
   }
 
-  private void trace(String operation) {
-    LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", operation, _ready, _available, _seenMail);
+  @VisibleForTesting
+  int availableSize() {
+    return _available.size();
   }
 
-  private static class AvailableEntry {
-
-    final OpChain _opChain;
-    final long _releaseTs;
+  @VisibleForTesting
+  int seenMailSize() {
+    return _seenMail.size();
+  }
 
-    private AvailableEntry(OpChain opChain, long releaseTs) {
-      _opChain = opChain;
-      _releaseTs = releaseTs;
-    }
+  @VisibleForTesting
+  int aliveChainsSize() {
+    return _aliveChains.size();
+  }
 
-    @Override
-    public String toString() {
-      return _opChain.toString();
-    }
+  private void trace(String operation) {
+    LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}",
+        operation, _ready, _available, _seenMail);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index ddbe1939ff..6de4e92040 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -34,13 +34,14 @@ public class OpChain implements AutoCloseable {
   private final MultiStageOperator _root;
   private final Set<MailboxIdentifier> _receivingMailbox;
   private final OpChainStats _stats;
-  private final String _id;
+  private final OpChainId _id;
 
-  public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, long requestId, int stageId) {
+  public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, int virtualServerId,
+      long requestId, int stageId) {
     _root = root;
     _receivingMailbox = new HashSet<>(receivingMailboxes);
-    _id = String.format("%s_%s", requestId, stageId);
-    _stats = new OpChainStats(_id);
+    _id = new OpChainId(requestId, virtualServerId, stageId);
+    _stats = new OpChainStats(_id.toString());
   }
 
   public Operator<TransferableBlock> getRoot() {
@@ -51,6 +52,10 @@ public class OpChain implements AutoCloseable {
     return _receivingMailbox;
   }
 
+  public OpChainId getId() {
+    return _id;
+  }
+
   // TODO: Move OperatorStats here.
   public OpChainStats getStats() {
     return _stats;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
new file mode 100644
index 0000000000..355021e58f
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.Objects;
+
+
+public class OpChainId {
+  final long _requestId;
+  final int _virtualServerId;
+  final int _stageId;
+
+  public OpChainId(long requestId, int virtualServerId, int stageId) {
+    _requestId = requestId;
+    _virtualServerId = virtualServerId;
+    _stageId = stageId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OpChainId opChainId = (OpChainId) o;
+    return _requestId == opChainId._requestId && _virtualServerId == opChainId._virtualServerId
+        && _stageId == opChainId._stageId;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_requestId, _virtualServerId, _stageId);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index a80233bee1..60dba3a0de 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -26,12 +26,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 
 /**
  * {@code OpChainStats} tracks execution statistics for {@link OpChain}s.
  */
+@NotThreadSafe
 public class OpChainStats {
 
   // use memoized supplier so that the timing doesn't start until the
@@ -71,14 +73,6 @@ public class OpChainStats {
     }
   }
 
-  public void startExecutionTimer() {
-    _exTimerStarted = true;
-    _exTimer.get();
-    if (!_executeStopwatch.isRunning()) {
-      _executeStopwatch.start();
-    }
-  }
-
   public Map<String, OperatorStats> getOperatorStatsMap() {
     return _operatorStatsMap;
   }
@@ -87,6 +81,14 @@ public class OpChainStats {
     _operatorStatsMap = operatorStatsMap;
   }
 
+  private void startExecutionTimer() {
+    _exTimerStarted = true;
+    _exTimer.get();
+    if (!_executeStopwatch.isRunning()) {
+      _executeStopwatch.start();
+    }
+  }
+
   @Override
   public String toString() {
     return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(),
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 77de6197ca..fb2de69d89 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -58,7 +58,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
 
   public static OpChain build(StageNode node, PlanRequestContext context) {
     MultiStageOperator root = node.visit(INSTANCE, context);
-    return new OpChain(root, context.getReceivingMailboxes(), context.getRequestId(), context.getStageId());
+    return new OpChain(root, context.getReceivingMailboxes(), context._server.virtualId(), context.getRequestId(),
+        context.getStageId());
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index d521f0f926..bc80c96df6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -70,7 +70,7 @@ public class QueryConfig {
    * an operator chain despite any amount of time elapsed.
    */
   public static final String KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS = "pinot.query.scheduler.release.timeout.ms";
-  public static final long DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS = -1;
+  public static final long DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS = 10_000;
 
   private QueryConfig() {
     // do not instantiate.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index fa6ba5e7c4..36df3de6ff 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -65,7 +65,7 @@ public class QueryServerEnclosure {
   private final InstanceDataManager _instanceDataManager;
   private final HelixManager _helixManager;
 
-  private QueryRunner _queryRunner;
+  private final QueryRunner _queryRunner;
 
   public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
     try {
@@ -112,7 +112,6 @@ public class QueryServerEnclosure {
   public void start()
       throws Exception {
     PinotConfiguration configuration = new PinotConfiguration(_runnerConfig);
-    _queryRunner = new QueryRunner();
     _queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
     _queryRunner.start();
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index c38a210e95..9f0d9e8963 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -44,11 +43,7 @@ public class OpChainSchedulerServiceTest {
   private ExecutorService _executor;
   private AutoCloseable _mocks;
 
-  @Mock
   private MultiStageOperator _operatorA;
-  @Mock
-  private MultiStageOperator _operatorB;
-  @Mock
   private OpChainScheduler _scheduler;
 
   @BeforeClass
@@ -64,9 +59,10 @@ public class OpChainSchedulerServiceTest {
 
   @BeforeMethod
   public void beforeMethod() {
+    _operatorA = Mockito.mock(MultiStageOperator.class);
+    _scheduler = Mockito.mock(OpChainScheduler.class);
     clearInvocations(_scheduler);
     clearInvocations(_operatorA);
-    clearInvocations(_operatorB);
   }
 
   private void initExecutor(int numThreads) {
@@ -74,16 +70,15 @@ public class OpChainSchedulerServiceTest {
   }
 
   private OpChain getChain(MultiStageOperator operator) {
-    return new OpChain(operator, ImmutableList.of(), 123, 1);
+    return new OpChain(operator, ImmutableList.of(), 1, 123, 1);
   }
 
   @Test
   public void shouldScheduleSingleOpChainRegisteredAfterStart()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    Mockito.when(_scheduler.hasNext()).thenReturn(true);
-    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
     CountDownLatch latch = new CountDownLatch(1);
@@ -92,11 +87,9 @@ public class OpChainSchedulerServiceTest {
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     });
 
-    // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+    scheduler.register(opChain);
 
-    // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
     scheduler.stopAsync().awaitTerminated();
   }
@@ -104,10 +97,9 @@ public class OpChainSchedulerServiceTest {
   @Test
   public void shouldScheduleSingleOpChainRegisteredBeforeStart()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    Mockito.when(_scheduler.hasNext()).thenReturn(true);
-    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
     CountDownLatch latch = new CountDownLatch(1);
@@ -116,152 +108,100 @@ public class OpChainSchedulerServiceTest {
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     });
 
-    // When:
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+    scheduler.register(opChain);
     scheduler.startAsync().awaitRunning();
 
-    // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
     scheduler.stopAsync().awaitTerminated();
   }
 
   @Test
-  public void shouldReRegisterOpChainOnNoOpBlock()
+  public void shouldYieldOpChainOnNoOpBlock()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    Mockito.when(_scheduler.hasNext()).thenReturn(true);
-    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
     CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
-        .thenAnswer(inv -> {
-          latch.countDown();
-          return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-        });
-
-    // When:
-    scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
-
-    // Then:
-    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
-    Mockito.verify(_scheduler, Mockito.times(2)).register(Mockito.any(OpChain.class), Mockito.any(boolean.class));
-    scheduler.stopAsync().awaitTerminated();
-  }
-
-  @Test
-  public void shouldYieldOpChainsWhenNoWorkCanBeDone()
-      throws InterruptedException {
-    // Given:
-    initExecutor(1);
-    Mockito.when(_scheduler.hasNext()).thenReturn(true);
-    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)).thenReturn(getChain(_operatorB))
-        .thenReturn(getChain(_operatorA));
-    OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
-
-    AtomicBoolean opAReturnedNoOp = new AtomicBoolean(false);
-    AtomicBoolean hasOpBRan = new AtomicBoolean(false);
-
-    CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
-      if (hasOpBRan.get()) {
-        latch.countDown();
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      } else {
-        opAReturnedNoOp.set(true);
-        return TransferableBlockUtils.getNoOpTransferableBlock();
-      }
-    });
-
-    Mockito.when(_operatorB.nextBlock()).thenAnswer(inv -> {
-      hasOpBRan.set(true);
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-    });
+    Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+    Mockito.doAnswer(inv -> {
+      latch.countDown();
+      return null;
+    }).when(_scheduler).yield(Mockito.any());
 
-    // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
-    scheduler.register(new OpChain(_operatorB, ImmutableList.of(), 123, 1));
+    scheduler.register(opChain);
 
-    // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
-    Assert.assertTrue(opAReturnedNoOp.get(), "expected opA to be scheduled first");
     scheduler.stopAsync().awaitTerminated();
   }
 
   @Test
-  public void shouldNotCallSchedulerNextWhenHasNextReturnsFalse()
+  public void shouldScheduleOpChainEvenIfNoOpChainsInAWhile()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
+    OpChain opChain = getChain(_operatorA);
+    CountDownLatch latch = new CountDownLatch(3);
+    AtomicBoolean returnedOpChain = new AtomicBoolean(false);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenAnswer(inv -> {
       latch.countDown();
-      return false;
+      if (latch.getCount() == 0 && returnedOpChain.compareAndSet(false, true)) {
+        return opChain;
+      }
+      return null;
     });
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
-    // When:
     scheduler.startAsync().awaitRunning();
 
-    // Then:
-    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected opChain to be scheduled");
     scheduler.stopAsync().awaitTerminated();
-
-    Mockito.verify(_scheduler, Mockito.never()).next();
   }
 
   @Test
-  public void shouldReevaluateHasNextWhenOnDataAvailableIsCalled()
+  public void shouldCallCloseOnOperators()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    CountDownLatch firstHasNext = new CountDownLatch(1);
-    CountDownLatch secondHasNext = new CountDownLatch(1);
-    Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
-      firstHasNext.countDown();
-      return false;
-    }).then(inv -> {
-      secondHasNext.countDown();
-      return false;
-    });
-
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
-    // When:
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.doAnswer(inv -> {
+      latch.countDown();
+      return null;
+    }).when(_operatorA).close();
+
     scheduler.startAsync().awaitRunning();
-    Assert.assertTrue(firstHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
-    scheduler.onDataAvailable(null);
+    scheduler.register(opChain);
 
-    // Then:
-    Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called again");
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
     scheduler.stopAsync().awaitTerminated();
   }
 
   @Test
-  public void shouldCallCloseOnOperators()
+  public void shouldCallCancelOnOpChainsThatThrow()
       throws InterruptedException {
-    // Given:
     initExecutor(1);
-    Mockito.when(_scheduler.hasNext()).thenReturn(true).thenReturn(false);
-    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
     OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
 
     CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+    Mockito.when(_operatorA.nextBlock()).thenThrow(new RuntimeException("foo"));
+    Mockito.doAnswer(inv -> {
       latch.countDown();
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-    });
+      return null;
+    }).when(_operatorA).cancel(Mockito.any());
 
-    // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+    scheduler.register(opChain);
 
-    // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
     scheduler.stopAsync().awaitTerminated();
-    Mockito.verify(_operatorA, Mockito.times(1)).close();
+    Mockito.verify(_operatorA, Mockito.times(1)).cancel(Mockito.any());
+    Mockito.verify(_scheduler, Mockito.times(1)).deregister(Mockito.any());
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 6bf18a73b3..88974b1cb3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.collect.ImmutableList;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
@@ -28,6 +28,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -35,17 +36,27 @@ import org.testng.annotations.Test;
 public class RoundRobinSchedulerTest {
   private static final int DEFAULT_SENDER_STAGE_ID = 0;
   private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
-
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1_1", "0@foo:2", "0@bar:3",
-      DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
-  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1_2", "0@foo:2", "0@bar:3",
-      DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+  private static final int DEFAULT_VIRTUAL_SERVER_ID = 1;
+  private static final int DEFAULT_POLL_TIMEOUT_MS = 0;
+  private static final int DEFAULT_RELEASE_TIMEOUT_MS = 10;
+  private static final long DEFAULT_REQUEST_ID = 123;
+  private static final String DEFAULT_SENDER_SERIALIZED = "0@foo:2";
+  private static final String DEFAULT_JOB_ID = String.format("%s_%s", DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+
+  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+      DEFAULT_SENDER_SERIALIZED, "1@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+      DEFAULT_SENDER_SERIALIZED, "2@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+  private static final MailboxIdentifier MAILBOX_3 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+      DEFAULT_SENDER_SERIALIZED, "3@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
 
   @Mock
   private MultiStageOperator _operator;
 
   private AutoCloseable _mocks;
 
+  private RoundRobinScheduler _scheduler;
+
   @BeforeClass
   public void beforeClass() {
     _mocks = MockitoAnnotations.openMocks(this);
@@ -57,109 +68,108 @@ public class RoundRobinSchedulerTest {
     _mocks.close();
   }
 
-  @Test
-  public void shouldScheduleNewOpChainsImmediately() {
-    // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.register(chain, true);
-
-    // Then:
-    Assert.assertTrue(scheduler.hasNext());
-    Assert.assertEquals(scheduler.next(), chain);
-  }
-
-  @Test
-  public void shouldNotScheduleRescheduledOpChainsImmediately() {
-    // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.register(chain, false);
-
-    // Then:
-    Assert.assertFalse(scheduler.hasNext());
-  }
-
-  @Test
-  public void shouldScheduleRescheduledOpChainOnDataAvailable() {
-    // Given:
-    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.register(chain1, false);
-    scheduler.register(chain2, false);
-    scheduler.onDataAvailable(MAILBOX_1);
-
-    // Then:
-    Assert.assertTrue(scheduler.hasNext());
-    Assert.assertEquals(scheduler.next(), chain1);
-    Assert.assertFalse(scheduler.hasNext());
+  @AfterTest
+  public void afterTest() {
+    _scheduler.shutdownNow();
   }
 
   @Test
-  public void shouldScheduleRescheduledOpChainAfterTimeout() {
-    // Given:
-    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    AtomicLong ticker = new AtomicLong(0);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler(100, ticker::get);
-
-    // When:
-    scheduler.register(chain1, false);
-    ticker.set(101);
-
-    // Then:
-    Assert.assertTrue(scheduler.hasNext());
-    Assert.assertEquals(scheduler.next(), chain1);
+  public void testSchedulerHappyPath()
+      throws InterruptedException {
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
+        123, DEFAULT_RECEIVER_STAGE_ID);
+    _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
+    _scheduler.register(chain);
+
+    // OpChain is scheduled immediately
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+    // No op-chains ready, so scheduler returns null
+    Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+    // When Op-Chain is done executing, yield is called
+    _scheduler.yield(chain);
+    // When data is received, callback is called
+    _scheduler.onDataAvailable(MAILBOX_1);
+    // next should return the OpChain immediately after the callback
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+    // Say the OpChain is done, then a de-register will be called
+    _scheduler.deregister(chain);
+
+    // There should be no entries left in the scheduler
+    Assert.assertEquals(0,
+        _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
   }
 
   @Test
-  public void shouldScheduleRescheduledOpChainOnDataAvailableBeforeRegister() {
-    // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.onDataAvailable(MAILBOX_1);
-    scheduler.register(chain, false);
-
-    // Then:
-    Assert.assertTrue(scheduler.hasNext());
-    Assert.assertEquals(scheduler.next(), chain);
-  }
-
-  @Test
-  public void shouldNotScheduleRescheduledOpChainOnDataAvailableForDifferentMailbox() {
-    // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.register(chain, false);
-    scheduler.onDataAvailable(MAILBOX_2);
-
-    // Then:
-    Assert.assertFalse(scheduler.hasNext());
+  public void testSchedulerWhenSenderDies()
+      throws InterruptedException {
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
+        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
+    _scheduler.register(chain);
+
+    // OpChain runs immediately after registration
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+    // No more OpChains to run
+    Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+    // When op-chain returns a no-op block, suspend it
+    _scheduler.yield(chain);
+
+    // Unless a callback is called, the chain would remain suspended. However, the scheduler will automatically
+    // promote available OpChains to ready every releaseMs.
+    Assert.assertEquals(_scheduler.next(DEFAULT_RELEASE_TIMEOUT_MS + 100, TimeUnit.MILLISECONDS), chain);
+
+    // Assuming the OpChain has timed out, the OpChain will be de-registered
+    _scheduler.deregister(chain);
+
+    // There should be no entries left in the scheduler
+    Assert.assertEquals(0,
+        _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
   }
 
   @Test
-  public void shouldScheduleRescheduledOpChainOnDataAvailableForAnyMailbox() {
-    // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1, MAILBOX_2), 123, DEFAULT_RECEIVER_STAGE_ID);
-    RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
-    // When:
-    scheduler.register(chain, false);
-    scheduler.onDataAvailable(MAILBOX_2);
-
-    // Then:
-    Assert.assertTrue(scheduler.hasNext());
-    Assert.assertEquals(scheduler.next(), chain);
-    Assert.assertEquals(scheduler._seenMail.size(), 0);
+  public void testSchedulerWhenParallelismGtOne()
+      throws InterruptedException {
+    // When parallelism is > 1, multiple OpChains with the same requestId and stageId would be registered in the same
+    // scheduler. Data received on a given mailbox should wake up exactly 1 OpChain corresponding to the virtual
+    // server-id determined by the Mailbox.
+    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), MAILBOX_1.getToHost().virtualId(),
+        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), MAILBOX_2.getToHost().virtualId(),
+        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    OpChain chain3 = new OpChain(_operator, ImmutableList.of(MAILBOX_3), MAILBOX_3.getToHost().virtualId(),
+        DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+    // Register 3 OpChains. Keep release timeout high to avoid unintended OpChain wake-ups.
+    _scheduler = new RoundRobinScheduler(10_000);
+    _scheduler.register(chain1);
+    _scheduler.register(chain2);
+    _scheduler.register(chain3);
+
+    // OpChains are returned in the order in which they were registered
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain1);
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain3);
+    // No op-chains ready, so scheduler returns null
+    Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+    // When Op-Chains are done executing, yield is called in any order
+    _scheduler.yield(chain1);
+    _scheduler.yield(chain3);
+    _scheduler.yield(chain2);
+    // Data may be received in arbitrary order
+    _scheduler.onDataAvailable(MAILBOX_2);
+    _scheduler.onDataAvailable(MAILBOX_3);
+    _scheduler.onDataAvailable(MAILBOX_1);
+    // Subsequent polls would be in the order the callback was processed. A callback here is said to be "processed"
+    // if it has successfully returned.
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain3);
+    Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain1);
+    // De-register may be called in any order again
+    _scheduler.deregister(chain3);
+    _scheduler.deregister(chain2);
+    _scheduler.deregister(chain1);
+
+    // There should be no entries left in the scheduler after everything is done
+    Assert.assertEquals(0,
+        _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org