You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/07 23:11:56 UTC

[GitHub] [pinot] agavra opened a new pull request, #9753: [multistage] implement naive round robin operator chain scheduling

agavra opened a new pull request, #9753:
URL: https://github.com/apache/pinot/pull/9753

   This is a follow-up to #9711 and follows the design outlined in [this design doc](https://docs.google.com/document/d/1XAMHAlhFbINvX-kK1ANlzbRz4_RkS0map4qhqs1yDtE/edit#heading=h.de4smgkh3bzk).
   
   This PR implements a round robin operator chain scheduling algorithm and sets up the interface for future PRs that will implement more advanced scheduling. As of this PR, we can be guaranteed that all queries will make progress (see the change in `pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java`, you can now run it under situations with only 2 cores available) but the algorithm is still very hungry for CPU (queries with nothing in their mailbox will still be scheduled).
   
   Review Guide:
   
   - look at `OpChainSchedulerService` and `RoundRobinScheduler`, which contains the logic of yielding threads when there's no more work to be done for a given operator chain
   - look at `WorkerQueryExecutor` to see where this new scheduler is now wired in as opposed to running the work directly on the old worker pool
   - I added some logic to `PhysicalPlanVisitor` to collect information on mailboxes so that later we can hook up the mailboxes with the scheduling logic. Probably should have been done in a follow-up PR but 🤷 I was already at it. Let me know if you want me to split it up
   - Look at the corresponding tests
   
   cc @walterddr @61yao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023096179


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  private final List<MailboxIdentifier> _inputMailboxes;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> inputMailboxes) {
+    _root = root;
+    _inputMailboxes = inputMailboxes;
+
+    // use memoized supplier so that the timing doesn't start until the first time we get
+    // the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {

Review Comment:
   question for follow up PRs: 
   assuming the opChain is going to be invoked via this root operator API. I was wondering how we can inform the opChain being scheduled from `scheduler.onDataAvailable(mailboxId)` API, which mailboxId has new data so we can do better than round-robin checking each mailbox on the list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1022181908


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java:
##########
@@ -69,26 +65,12 @@ public synchronized void shutDown() {
   }
 
   public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
-      ExecutorService executorService) {
+      OpChainSchedulerService scheduler) {

Review Comment:
   I just deleted `WorkerQueryExecutor` altogether, it really doesn't make sense to have that and the scheduler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023178252


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   i am not sure how to interpret this comment. eventually the timeout should be returned by error block. maybe we can clarify more in a concrete example. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019537532


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();

Review Comment:
   IIUC, for the current mechanism, this will return NO-OP everytime it reaches all the way down to mailbox receive and the buffer is empty. yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1022134450


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }
+          }
+        });
+      } finally {
+        _monitor.leave();
+      }
+    }
+  }
+
+  /**
+   * Register a new operator chain with the scheduler.
+   *
+   * @param operatorChain the chain to register
+   */
+  public final void register(OpChain operatorChain) {
+    _monitor.enter();
+    try {
+      _scheduler.register(operatorChain);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  /**
+   * This method should be called whenever data is available in a given mailbox.
+   * Implementations of this method should be idempotent, it may be called in the
+   * scenario that no mail is available.
+   *
+   * @param mailbox the identifier of the mailbox that now has data
+   */
+  public final void onDataAvailable(MailboxIdentifier mailbox) {
+    _monitor.enter();
+    try {
+      _scheduler.onDataAvailable(mailbox);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  // TODO: remove this method after we pipe down the proper executor pool to the v1 engine

Review Comment:
   +1, but for another day



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1022181908


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java:
##########
@@ -69,26 +65,12 @@ public synchronized void shutDown() {
   }
 
   public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
-      ExecutorService executorService) {
+      OpChainSchedulerService scheduler) {

Review Comment:
   I just deleted `WorkerQueryExecutor` altogether, it really doesn't make sense to have that and the scheduler.
   
   The constructing OpChain in the request thread should be a different PR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023177805


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   currently OpChain is equivalent to stage. we can add later once we have the split logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019538445


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java:
##########
@@ -218,6 +221,7 @@ public void start()
     _queryRunner = new QueryRunner();
     _queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
     _queryRunner.start();
+    _scheduler.startAsync().awaitRunning();

Review Comment:
   shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9753:
URL: https://github.com/apache/pinot/pull/9753#issuecomment-1306404279

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9753?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9753](https://codecov.io/gh/apache/pinot/pull/9753?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2f952ef) into [master](https://codecov.io/gh/apache/pinot/commit/2f640ff8683728b844be9e045d5ce0f026d09e71?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2f640ff) will **decrease** coverage by `45.53%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9753       +/-   ##
   =============================================
   - Coverage     70.08%   24.54%   -45.54%     
   + Complexity     4980       53     -4927     
   =============================================
     Files          1951     1944        -7     
     Lines        104561   104502       -59     
     Branches      15836    15836               
   =============================================
   - Hits          73279    25653    -47626     
   - Misses        26155    76234    +50079     
   + Partials       5127     2615     -2512     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `24.54% <0.00%> (-0.09%)` | :arrow_down: |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9753?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `0.00% <0.00%> (-81.56%)` | :arrow_down: |
   | [...uery/runtime/executor/OpChainSchedulerService.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9leGVjdXRvci9PcENoYWluU2NoZWR1bGVyU2VydmljZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ot/query/runtime/executor/RoundRobinScheduler.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9leGVjdXRvci9Sb3VuZFJvYmluU2NoZWR1bGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ot/query/runtime/executor/WorkerQueryExecutor.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9leGVjdXRvci9Xb3JrZXJRdWVyeUV4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | [...query/runtime/operator/MailboxReceiveOperator.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94UmVjZWl2ZU9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-78.95%)` | :arrow_down: |
   | [...g/apache/pinot/query/runtime/operator/OpChain.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9PcENoYWluLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../pinot/query/runtime/plan/PhysicalPlanVisitor.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BoeXNpY2FsUGxhblZpc2l0b3IuamF2YQ==) | `0.00% <0.00%> (-96.78%)` | :arrow_down: |
   | [...va/org/apache/pinot/query/service/QueryServer.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9RdWVyeVNlcnZlci5qYXZh) | `0.00% <0.00%> (-72.50%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/BaseRecording.java](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvQmFzZVJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1427 more](https://codecov.io/gh/apache/pinot/pull/9753/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023119137


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  private final List<MailboxIdentifier> _inputMailboxes;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> inputMailboxes) {
+    _root = root;
+    _inputMailboxes = inputMailboxes;
+
+    // use memoized supplier so that the timing doesn't start until the first time we get
+    // the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {
+    return _root;
+  }
+
+  public List<MailboxIdentifier> getInputMailboxes() {
+    return _inputMailboxes;
+  }

Review Comment:
   thank you. or adding a unit-test to explain is also good. i am assuming it will be related to https://github.com/apache/pinot/pull/9753#discussion_r1023096179 but I am not sure exactly. so i pop the question. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023184369


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   I meant even for stage one, we should have an ID for easier debugging. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1015990164


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());

Review Comment:
   for more complex scheduling algorithms, we will add a callback here to `complete` or `unregister` an operator chain. that requires a unique way to identify operator chains which adds a bit more code so I avoided it for this PR



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }
+          }
+        });
+      } finally {
+        _monitor.leave();
+      }
+    }
+  }
+
+  /**
+   * Register a new operator chain with the scheduler.
+   *
+   * @param operatorChain the chain to register
+   */
+  public final void register(OpChain operatorChain) {
+    _monitor.enter();
+    try {
+      _scheduler.register(operatorChain);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  /**
+   * This method should be called whenever data is available in a given mailbox.
+   * Implementations of this method should be idempotent, it may be called in the
+   * scenario that no mail is available.
+   *
+   * @param mailbox the identifier of the mailbox that now has data
+   */
+  public final void onDataAvailable(MailboxIdentifier mailbox) {
+    _monitor.enter();
+    try {
+      _scheduler.onDataAvailable(mailbox);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  // TODO: remove this method after we pipe down the proper executor pool to the v1 engine

Review Comment:
   this is a bit unfortunate, but it's how the existing code works and refactoring it would be out of scope for this PR. While it's not particularly efficient, it also isn't dangerous - V1 queries are non-blocking, so using the same worker pool for executing V1 queries (that are issued as part of V2) and V2 intermediate queries does not threaten liveness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019535877


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());

Review Comment:
   +1. good splitting point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9753:
URL: https://github.com/apache/pinot/pull/9753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019535612


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }
+          }
+        });
+      } finally {
+        _monitor.leave();
+      }
+    }
+  }
+
+  /**
+   * Register a new operator chain with the scheduler.
+   *
+   * @param operatorChain the chain to register
+   */
+  public final void register(OpChain operatorChain) {
+    _monitor.enter();
+    try {
+      _scheduler.register(operatorChain);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  /**
+   * This method should be called whenever data is available in a given mailbox.
+   * Implementations of this method should be idempotent, it may be called in the
+   * scenario that no mail is available.
+   *
+   * @param mailbox the identifier of the mailbox that now has data
+   */
+  public final void onDataAvailable(MailboxIdentifier mailbox) {
+    _monitor.enter();
+    try {
+      _scheduler.onDataAvailable(mailbox);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  // TODO: remove this method after we pipe down the proper executor pool to the v1 engine

Review Comment:
   lol should've read this first. good call out. 
   
   another way is to simple decoupled the executor service used by v1 from v2. i am not sure which is better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023086131


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  private final List<MailboxIdentifier> _inputMailboxes;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> inputMailboxes) {
+    _root = root;
+    _inputMailboxes = inputMailboxes;
+
+    // use memoized supplier so that the timing doesn't start until the first time we get
+    // the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {
+    return _root;
+  }
+
+  public List<MailboxIdentifier> getInputMailboxes() {
+    return _inputMailboxes;
+  }

Review Comment:
   doesn't seem like this is being used. from what I understand on the triggering mechanism:
   - register
   - onDataAvailable
   - next().getRoot().nextBlock() completes
   
   all should be able to identify which opChain to call based on jobID alone. do we need the input mailbox id list?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java:
##########
@@ -56,18 +59,24 @@
  * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, ExecutorService, Map)
  */
 public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> {
-  private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
 
-  public static Operator<TransferableBlock> build(StageNode node, PlanRequestContext context) {
-    return node.visit(INSTANCE, context);
+  private List<MailboxIdentifier> _inputMailboxIds = new ArrayList<>();

Review Comment:
   can we move this into the PlanRequestContext? otherwise, we need to adjust the comment for the static usage of this visitor class



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {

Review Comment:
   this is more of a question --> why one of these is private and the other is protected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1016131586


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;

Review Comment:
   Add a comment saying this is guarded by monitor below?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());

Review Comment:
   Is this logging expensive? I feel there would be a lot of logs if we log the pause every time. Can we have some class such as OpChainStats to hold the data and we decide later where to report them? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   Ideally this OpChain should capture the timeout info rather than we rely on MailboxSendOperator timeout. It is clear when we should timeout rather than relying on if the root operator times out correctly, this will work.  



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   I feel we should also have OpChain ID somewhere. maybe in future PRs :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023179366


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());

Review Comment:
   oops good callout. this could be a debug log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019534344


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
+/**
+ * An interface that defines different scheduling strategies to work with the
+ * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed
+ * to never be called concurrently - therefore all implementations may use data
+ * structures that are not concurrent.
+ */
+public interface OpChainScheduler {
+
+  /**
+   * @param operatorChain the operator chain to register
+   */
+  void register(OpChain operatorChain);
+
+  /**
+   * This method is called whenever {@code mailbox} has new data available to consume,
+   * this can be useful for advanced scheduling algorithms
+   *
+   * @param mailbox the mailbox ID
+   */
+  void onDataAvailable(MailboxIdentifier mailbox);

Review Comment:
   is this the only trigger? (other than register)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019530377


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java:
##########
@@ -69,26 +65,12 @@ public synchronized void shutDown() {
   }
 
   public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
-      ExecutorService executorService) {
+      OpChainSchedulerService scheduler) {

Review Comment:
   not necessarily needed in this PR: let's change this API to directly take operator chain as input
   ```suggestion
     public void processQuery(OpChain opChain, OpChainSchedulerService scheduler) {
   ```
   and make the opChain constructor outside --> this way we can early return if there's any error during opChain construct



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -128,7 +129,7 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ
       for (ServerPlanRequestContext requestContext : serverQueryRequests) {
         ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
-        serverQueryResults.add(processServerQuery(request, executorService));
+        serverQueryResults.add(processServerQuery(request, scheduler.getWorkerPool()));

Review Comment:
   this means leaf stage are directly scheduled on top of the worker pool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023190250


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   I like that suggestion a lot, definitely a good improvement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023182998


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   This means we don't schedule the chain anymore if we check the opchain times out. 
   Say we have deadline in  Opchain. we can do 
   
   while scheduling, we can do 
   if (now > deadline){
     discard the scheduling.
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023170742


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   Ideally this OpChain should capture the timeout info rather than we rely on MailboxSendOperator timeout. It is clear when we should timeout instead of saying if the root operator times out correctly, this will work.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1016084144


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java:
##########
@@ -66,10 +64,6 @@ public class SSBQueryIntegrationTest extends BaseClusterIntegrationTest {
   @BeforeClass
   public void setUp()
       throws Exception {
-    if (Runtime.getRuntime().availableProcessors() < MIN_AVAILABLE_CORE_REQUIREMENT) {
-      throw new SkipException("Skip SSB query testing. Insufficient core count: "
-          + Runtime.getRuntime().availableProcessors());
-    }

Review Comment:
   freaking awesome!!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023180329


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }

Review Comment:
   i dont think this is returnable via data blocks. echo back to @61yao's comment on timeout. maybe we need other ways to indicate out-of-norm failures. let's follow up on other PR (i don't think we handle these correctly right now either)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023120921


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  private final List<MailboxIdentifier> _inputMailboxes;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> inputMailboxes) {
+    _root = root;
+    _inputMailboxes = inputMailboxes;
+
+    // use memoized supplier so that the timing doesn't start until the first time we get
+    // the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {

Review Comment:
   that's exactly the plan for the next PR - in fact it'll be even better than that, it won't schedule anything unless there's data to be scheduled at all (and it'll sleep until it's notified of available data)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1021959842


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();
+              while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+                LOGGER.debug("Got block with {} rows.", result.getNumRows());
+                result = operatorChain.getRoot().nextBlock();
+              }
+
+              if (!result.isEndOfStreamBlock()) {
+                // not complete, needs to re-register for scheduling
+                register(operatorChain);
+              } else {
+                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+              }
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute query!", e);
+            }
+          }
+        });
+      } finally {
+        _monitor.leave();
+      }
+    }
+  }
+
+  /**
+   * Register a new operator chain with the scheduler.
+   *
+   * @param operatorChain the chain to register
+   */
+  public final void register(OpChain operatorChain) {
+    _monitor.enter();
+    try {
+      _scheduler.register(operatorChain);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  /**
+   * This method should be called whenever data is available in a given mailbox.
+   * Implementations of this method should be idempotent, it may be called in the
+   * scenario that no mail is available.
+   *
+   * @param mailbox the identifier of the mailbox that now has data
+   */
+  public final void onDataAvailable(MailboxIdentifier mailbox) {
+    _monitor.enter();
+    try {
+      _scheduler.onDataAvailable(mailbox);
+    } finally {
+      _monitor.leave();
+    }
+  }
+
+  // TODO: remove this method after we pipe down the proper executor pool to the v1 engine

Review Comment:
   yeah, I think we should split v1/v2 executor pools - that's probably the safest option. Alternatively we may also want three pools: a v2-intermediate pool, a v1-via-v2 pool and a v1-vanilla pool. That will allow us to make sure that clusters that run an existing v1 vanilla workload have their exposure to the v2 engine entirely limited 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -42,25 +43,30 @@
 /**
  * {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}.
  */
+@SuppressWarnings("UnstableApiUsage")
 public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
 
   private final Server _server;
   private final QueryRunner _queryRunner;
-  private final ExecutorService _executorService;
+  private final OpChainSchedulerService _scheduler;
 
   public QueryServer(int port, QueryRunner queryRunner) {
     _server = ServerBuilder.forPort(port).addService(this).build();
-    _executorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
-        new NamedThreadFactory("query_worker_on_" + port + "_port"));
+    _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(),
+        Executors.newFixedThreadPool(
+            ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+            new NamedThreadFactory("query_worker_on_" + port + "_port")));
     _queryRunner = queryRunner;
+
     LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port,
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
   public void start() {
     LOGGER.info("Starting QueryWorker");
     try {
+      _scheduler.startAsync().awaitRunning();

Review Comment:
   oops! good catch



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
+    @Override
+    public boolean isSatisfied() {
+      return _scheduler.hasNext() || !isRunning();
+    }
+  };
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
+    _scheduler = scheduler;
+    _workerPool = workerPool;
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    // this wil just notify all waiters that the scheduler is shutting down
+    _monitor.enter();
+    _monitor.leave();
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    while (isRunning()) {
+      _monitor.enterWhen(_hasNextOrClosing);
+      try {
+        if (!isRunning()) {
+          return;
+        }
+
+        OpChain operatorChain = _scheduler.next();
+        _workerPool.submit(new TraceRunnable() {
+          @Override
+          public void runJob() {
+            try {
+              ThreadTimer timer = operatorChain.getAndStartTimer();
+
+              // so long as there's work to be done, keep getting the next block
+              // when the operator chain returns a NOOP block, then yield the execution
+              // of this to another worker
+              TransferableBlock result = operatorChain.getRoot().nextBlock();

Review Comment:
   👍 yup that's correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1021957089


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
+/**
+ * An interface that defines different scheduling strategies to work with the
+ * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed
+ * to never be called concurrently - therefore all implementations may use data
+ * structures that are not concurrent.
+ */
+public interface OpChainScheduler {
+
+  /**
+   * @param operatorChain the operator chain to register
+   */
+  void register(OpChain operatorChain);
+
+  /**
+   * This method is called whenever {@code mailbox} has new data available to consume,
+   * this can be useful for advanced scheduling algorithms
+   *
+   * @param mailbox the mailbox ID
+   */
+  void onDataAvailable(MailboxIdentifier mailbox);

Review Comment:
   there's three possible triggers:
   - register
   - onDataAvailable
   - next().getRoot().nextBlock() completes
   
   Triggers are defined in the implementation instead of the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023190845


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   big +1! I actually started adding that but decided that should be done in future PRs (see https://github.com/apache/pinot/pull/9753#discussion_r1015990164)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1019538196


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -42,25 +43,30 @@
 /**
  * {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}.
  */
+@SuppressWarnings("UnstableApiUsage")
 public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
 
   private final Server _server;
   private final QueryRunner _queryRunner;
-  private final ExecutorService _executorService;
+  private final OpChainSchedulerService _scheduler;
 
   public QueryServer(int port, QueryRunner queryRunner) {
     _server = ServerBuilder.forPort(port).addService(this).build();
-    _executorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
-        new NamedThreadFactory("query_worker_on_" + port + "_port"));
+    _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(),
+        Executors.newFixedThreadPool(
+            ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+            new NamedThreadFactory("query_worker_on_" + port + "_port")));
     _queryRunner = queryRunner;
+
     LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port,
         ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
   public void start() {
     LOGGER.info("Starting QueryWorker");
     try {
+      _scheduler.startAsync().awaitRunning();

Review Comment:
   shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023178252


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   i am not sure how to interpret this comment. eventually the timeout needs to be returned by error block in the current architecture. maybe we can clarify more in a concrete example on what other routes opChain can bubble up the timeout



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023187621


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {

Review Comment:
   ah! i see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023119856


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Monitor;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides the implementation for scheduling multistage queries on a single node based
+ * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
+ * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
+ * chain ({@link OpChainScheduler#next()} will be requested.
+ *
+ * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OpChainSchedulerService extends AbstractExecutionThreadService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
+
+  private final OpChainScheduler _scheduler;
+  private final ExecutorService _workerPool;
+
+  // anything that is guarded by this monitor should be non-blocking
+  private final Monitor _monitor = new Monitor();
+  protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {

Review Comment:
   that is a very good question... I think it was an autocomplete typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1023117128


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Suppliers;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ThreadTimer;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+/**
+ * An {@code OpChain} represents a chain of operators that are separated
+ * by send/receive stages.
+ */
+public class OpChain {
+
+  private final Operator<TransferableBlock> _root;
+  private final List<MailboxIdentifier> _inputMailboxes;
+  // TODO: build timers that are partial-execution aware
+  private final Supplier<ThreadTimer> _timer;
+
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> inputMailboxes) {
+    _root = root;
+    _inputMailboxes = inputMailboxes;
+
+    // use memoized supplier so that the timing doesn't start until the first time we get
+    // the timer
+    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+  }
+
+  public Operator<TransferableBlock> getRoot() {
+    return _root;
+  }
+
+  public List<MailboxIdentifier> getInputMailboxes() {
+    return _inputMailboxes;
+  }

Review Comment:
   I think I see what you're saying. Since this isn't used in this PR I'll just clean it up for now and pipe it back in when I add the PR which triggers the scheduler via mailbox data available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9753: [multistage] implement naive round robin operator chain scheduling

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9753:
URL: https://github.com/apache/pinot/pull/9753#discussion_r1021955866


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -128,7 +129,7 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ
       for (ServerPlanRequestContext requestContext : serverQueryRequests) {
         ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
-        serverQueryResults.add(processServerQuery(request, executorService));
+        serverQueryResults.add(processServerQuery(request, scheduler.getWorkerPool()));

Review Comment:
   yes, IIRC this is the same as existing behavior if you follow where `executorService` is created and passed down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org