You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/01 16:14:48 UTC

[GitHub] [pinot] agavra commented on a diff in pull request #9887: [multistage] improved task scheduling

agavra commented on code in PR #9887:
URL: https://github.com/apache/pinot/pull/9887#discussion_r1037310985


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -18,38 +18,107 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.Set;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * This is a scheduler that schedules operator chains in round robin fashion,
+ * but will only schedule them when there is work to be done. The availability
+ * of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
+ * callback.
+ */
 public class RoundRobinScheduler implements OpChainScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class);
 
-  private final Queue<OpChain> _opChainQueue = new LinkedList<>();
+  // the _available queue contains operator chains that are available
+  // to this scheduler but do not have any data available to schedule
+  // while the _ready queue contains the operator chains that are ready
+  // to be scheduled (have data, or are first-time scheduled)
+  private final Queue<OpChain> _available = new LinkedList<>();
+  private final Queue<OpChain> _ready = new LinkedList<>();
+
+  // using a Set here is acceptable because calling hasNext() and
+  // onDataAvailable() cannot occur concurrently - that means that
+  // anytime we schedule a new operator based on the presence of
+  // mail we can be certain that it will consume all of the mail
+  // form that mailbox, even if there are multiple items in it. If,
+  // during execution of that operator, more mail appears, then the
+  // operator will be rescheduled immediately potentially resulting
+  // in a false-positive schedule
+  @VisibleForTesting
+  final Set<MailboxIdentifier> _seenMail = new HashSet<>();
 
   @Override
-  public void register(OpChain operatorChain) {
-    _opChainQueue.add(operatorChain);
+  public void register(OpChain operatorChain, boolean isNew) {
+    // the first time an operator chain is scheduled, it should
+    // immediately be considered ready in case it does not need
+    // read from any mailbox (e.g. with a LiteralValueOperator)
+    (isNew ? _ready : _available).add(operatorChain);
   }
 
   @Override
   public void onDataAvailable(MailboxIdentifier mailbox) {
-    // do nothing - this doesn't change order of execution
+    // it may be possible to receive this callback when there's no corresponding
+    // operator chain registered to the mailbox - this can happen when either
+    // (1) we get the callback before the first register is called or (2) we get
+    // the callback while the operator chain is executing. to account for this,
+    // we just store it in a set of seen mail and only check for it when hasNext
+    // is called.
+    //
+    // note that scenario (2) may cause a false-positive schedule where an operator
+    // chain gets scheduled for mail that it had already processed, in which case
+    // the operator chain will simply do nothing and get put back onto the queue.
+    // scenario (2) may additionally cause a memory leak - if onDataAvailable is
+    // called with an EOS block _while_ the operator chain is executing, the chain
+    // will consume the EOS block and computeReady() will never remove the mailbox
+    // from the _seenMail set.
+    //
+    // TODO: fix the memory leak by adding a close(opChain) callback

Review Comment:
   cc @61yao - the cancellation mechanism should integrate with the scheduler to clean up any operator chains that expired as well as the seen mail notifications that are stored here. I'll take a look at your PR (#9836) and see how they can integrate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org