You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/13 16:53:29 UTC

[pinot] branch master updated: [multistage] follow up leaf scheduler (#10760)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0912544cce [multistage] follow up leaf scheduler (#10760)
0912544cce is described below

commit 0912544cce9307462c5931abdc61131ec2d50bfa
Author: Rong Rong <ro...@apache.org>
AuthorDate: Sat May 13 09:53:21 2023 -0700

    [multistage] follow up leaf scheduler (#10760)
    
    this is a follow-up on comments on #10711
    
    * with the execution model change, mailbox send no longer need to do a while loop
    * with the changes to leaf-stage operator it is no longer needed for a separate scheduler.
    * also fix scheduler yielding naming in test
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/runtime/QueryRunner.java    |   7 +-
 .../runtime/executor/LeafSchedulerService.java     | 102 ---------------------
 .../runtime/executor/OpChainSchedulerService.java  |  10 +-
 .../runtime/executor/RoundRobinScheduler.java      |   2 +-
 .../runtime/operator/MailboxSendOperator.java      |  29 +++---
 ...eTest.java => OpChainSchedulerServiceTest.java} |   2 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |   8 +-
 7 files changed, 31 insertions(+), 129 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 0ff209b904..92bf32638d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -45,7 +45,6 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.executor.LeafSchedulerService;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
 import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -91,7 +90,7 @@ public class QueryRunner {
   private ExecutorService _queryRunnerExecutorService;
 
   private OpChainSchedulerService _intermScheduler;
-  private LeafSchedulerService _leafScheduler;
+  private OpChainSchedulerService _leafScheduler;
 
   /**
    * Initializes the query executor.
@@ -117,7 +116,7 @@ public class QueryRunner {
           new NamedThreadFactory("query_runner_on_" + _port + "_port"));
       _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
           getQueryWorkerIntermExecutorService());
-      _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService());
+      _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService());
       _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
@@ -131,6 +130,7 @@ public class QueryRunner {
     _helixPropertyStore = _helixManager.getHelixPropertyStore();
     _mailboxService.start();
     _serverExecutor.start();
+    _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
     _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
   }
 
@@ -138,6 +138,7 @@ public class QueryRunner {
       throws TimeoutException {
     _serverExecutor.shutDown();
     _mailboxService.shutdown();
+    _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
     _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
deleted file mode 100644
index faa4bf4945..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.executor;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import org.apache.pinot.core.util.trace.TraceRunnable;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.operator.OpChainId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class LeafSchedulerService {
-  private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class);
-
-  private final ExecutorService _executorService;
-  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
-
-  public LeafSchedulerService(ExecutorService executorService) {
-    _executorService = executorService;
-    _submittedOpChainMap = new ConcurrentHashMap<>();
-  }
-
-  public void register(OpChain operatorChain) {
-    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
-      @Override
-      public void runJob() {
-        boolean isFinished = false;
-        boolean returnedErrorBlock = false;
-        Throwable thrown = null;
-        try {
-          LOGGER.trace("({}): Executing", operatorChain);
-          operatorChain.getStats().executing();
-          TransferableBlock result = operatorChain.getRoot().nextBlock();
-          while (!result.isEndOfStreamBlock()) {
-            result = operatorChain.getRoot().nextBlock();
-          }
-          isFinished = true;
-          if (result.isErrorBlock()) {
-            returnedErrorBlock = true;
-            LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
-                result.getDataBlock().getExceptions());
-          } else {
-            LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
-          }
-        } catch (Exception e) {
-          LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
-          thrown = e;
-        } finally {
-          if (returnedErrorBlock || thrown != null) {
-            cancelOpChain(operatorChain, thrown);
-          } else if (isFinished) {
-            closeOpChain(operatorChain);
-          }
-        }
-      }
-    });
-    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
-  }
-
-  public void cancel(long requestId) {
-    // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up
-    // via query timeout.
-    List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()
-        .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList());
-    for (OpChainId opChainId : opChainIdsToCancel) {
-      Future<?> future = _submittedOpChainMap.get(opChainId);
-      if (future != null) {
-        future.cancel(true);
-      }
-    }
-  }
-
-  private void closeOpChain(OpChain opChain) {
-    opChain.close();
-  }
-
-  private void cancelOpChain(OpChain opChain, Throwable t) {
-    opChain.cancel(t);
-  }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index a886269ab9..b4e55796b7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -48,20 +48,16 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
    * Default cancel signal retention, this should be set to several times larger than
    * {@link org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
    */
-  private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L;
+  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L;
 
   private final OpChainScheduler _scheduler;
   private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests;
+  private final Cache<Long, Long> _cancelledRequests = CacheBuilder.newBuilder()
+      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, TimeUnit.MILLISECONDS).build();
 
   public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
-    this(scheduler, workerPool, DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS);
-  }
-
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long cancelRetentionMs) {
     _scheduler = scheduler;
     _workerPool = workerPool;
-    _cancelledRequests = CacheBuilder.newBuilder().expireAfterWrite(cancelRetentionMs, TimeUnit.MILLISECONDS).build();
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index a1934d884a..b143cee77e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -94,7 +94,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
   private final Supplier<Long> _ticker;
 
   private final Map<OpChainId, OpChain> _aliveChains = new ConcurrentHashMap<>();
-  final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+  private final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
   private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
 
   private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 5fba67f735..4741d0a151 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -109,7 +109,8 @@ public class MailboxSendOperator extends MultiStageOperator {
       sendingMailboxes.add(mailboxService.getSendingMailbox(receiverMailboxMetadatas.getVirtualAddress(i).hostname(),
           receiverMailboxMetadatas.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs));
     }
-    return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector, TransferableBlockUtils::splitBlock);
+    return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector,
+        TransferableBlockUtils::splitBlock);
   }
 
   @Override
@@ -128,21 +129,21 @@ public class MailboxSendOperator extends MultiStageOperator {
     TransferableBlock transferableBlock;
     try {
       transferableBlock = _sourceOperator.nextBlock();
-      while (!transferableBlock.isNoOpBlock()) {
-        if (transferableBlock.isEndOfStreamBlock()) {
-          if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
-            //Stats need to be populated here because the block is being sent to the mailbox
-            // and the receiving opChain will not be able to access the stats from the previous opChain
-            TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
-                OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
-            _exchange.send(eosBlockWithStats);
-          } else {
-            _exchange.send(transferableBlock);
-          }
-          return transferableBlock;
+      if (transferableBlock.isNoOpBlock()) {
+        return transferableBlock;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+          // Stats need to be populated here because the block is being sent to the mailbox
+          // and the receiving opChain will not be able to access the stats from the previous opChain
+          TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
+              OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+          _exchange.send(eosBlockWithStats);
+        } else {
+          _exchange.send(transferableBlock);
         }
+      } else { // normal blocks
+        // check whether we should continue depending on exchange queue condition.
         _exchange.send(transferableBlock);
-        transferableBlock = _sourceOperator.nextBlock();
       }
     } catch (Exception e) {
       transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 4f7f8e5565..fbbab9e4d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.clearInvocations;
 
 
-public class YieldingSchedulerServiceTest {
+public class OpChainSchedulerServiceTest {
 
   private ExecutorService _executor;
   private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 5080ad8454..1d13e5f097 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -151,7 +151,13 @@ public class MailboxSendOperatorTest {
     TransferableBlock block = mailboxSendOperator.nextBlock();
 
     // Then:
-    assertSame(block, eosBlock, "expected EOS block to propagate");
+    assertSame(block, dataBlock, "expected data block to propagate first");
+
+    // When:
+    block = mailboxSendOperator.nextBlock();
+
+    // Then:
+    assertSame(block, eosBlock, "expected EOS block to propagate next");
     ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
     verify(_exchange, times(2)).send(captor.capture());
     List<TransferableBlock> blocks = captor.getAllValues();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org