You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/20 03:24:44 UTC

[GitHub] [pinot] 61yao opened a new pull request, #9836: [multistage][bugfix] Move sending mailbox instance from global map to request context

61yao opened a new pull request, #9836:
URL: https://github.com/apache/pinot/pull/9836

   This PR moves sending mailbox map from global map (MailboxService) to per request context (PlanRequestContext).
   
   The old design has two issues: 
   1) Memory leak of the instance because when request finishes, the sending mailbox doesn't need to be alive anymore (this is also true for receiving mailbox)
   2) High contention for the concurrent hash map.
   
   The new implementation moves the instance and exchange to per request context.
   1) This fixes the memory leak and contention issue for the old design.
   2) This opens the door for better resource cleaning and error handling in general. 
   
   Following PRs would be:
   1) proper close mailbox channel and propagate error when there is an error
   2) clean up the leak for receiving mailbox (same issue) 
   3) Have a better abstraction for request context. (should be runtime context or opchain context)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9836: [multistage][POC] Move mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9836:
URL: https://github.com/apache/pinot/pull/9836#discussion_r1037342781


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -63,4 +63,9 @@ public boolean isInitialized() {
   public boolean isClosed() {
     return _closed && _queue.size() == 0;
   }
+
+  @Override
+  public void close() {

Review Comment:
   should we clear the queue here?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -130,7 +130,9 @@ public void onError(Throwable e) {
 
   @Override
   public void onCompleted() {
-    _isCompleted.set(true);
-    _responseObserver.onCompleted();
+    if(!_isCompleted.get()){
+      _isCompleted.set(true);

Review Comment:
   there's a race condition here, it should be `if (!isCompleted.compareAndSet(false, true))`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java:
##########
@@ -83,6 +87,7 @@ private void shutdown() {
 
   @Override
   public void onCompleted() {
+    finishLatch.countDown();

Review Comment:
   nit: maybe we move this and `_isCompleted.set(true)` to `shutdown` so both `onError` and `onCompleted` have the same behavior.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();

Review Comment:
   nit: let's keep both log statements (though looks like I forgot to change this one to `debug`!)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java:
##########
@@ -18,36 +18,58 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 
 
 public class PlanRequestContext {
   protected final MailboxService<TransferableBlock> _mailboxService;
   protected final long _requestId;
-  protected final int _stageId;
   protected final String _hostName;
   protected final int _port;
   protected final Map<Integer, StageMetadata> _metadataMap;
+  // TODO: Add exchange map if multiple exchanges are needed.
+  BlockExchange _exchange;

Review Comment:
   I think it breaks some abstraction barriers to allow any piece of code that has access to the `PlanRequestContext` to exchange blocks via a `BlockExchange`. Only the `MailboxSendOperator` should be able to send blocks IMO - otherwise it can be difficult to debug the ordering of events that are sent.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java:
##########
@@ -35,9 +36,12 @@ public class OpChain {
   // TODO: build timers that are partial-execution aware
   private final Supplier<ThreadResourceUsageProvider> _timer;
 
-  public OpChain(Operator<TransferableBlock> root) {
-    _root = root;
+  // TODO: refactor this into OpChainContext
+  public PlanRequestContext _context;

Review Comment:
   nit: `private final`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();
               }
             } catch (Exception e) {
-              LOGGER.error("Failed to execute query!", e);
+              operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));

Review Comment:
   this breaks some abstraction boundaries - this scheduler service should know nothing about the exchange or sending blocks; instead we should consider adding this to `MailboxSendOperator` (which is always the root operator for these chains). FWIW, I think that's already the case.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -97,10 +97,12 @@ public void runJob() {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain);
               } else {
-                LOGGER.info("Execution time: " + timer.getThreadTimeNs());
+                operatorChain.getRoot().close();
               }
             } catch (Exception e) {
-              LOGGER.error("Failed to execute query!", e);
+              operatorChain._context.getExchange().send(TransferableBlockUtils.getErrorTransferableBlock(e));
+              // TODO: pass this error through context.
+              operatorChain.getRoot().close();

Review Comment:
   consider pushing this into an `OpChain#close` method, we can also call that method from the scheduler in situations where the scheduler leaks operator chains (should never happen if errors are propagated properly, but it's good to be defensive in this situation)
   
   even better would be to have a `scheduler.unregister(OpChain)` method that we call here, that way we can also use that to clean up things like tracked mailboxes (see my comment on #9887)



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -33,204 +33,204 @@ public class QueryTestSet {
   public Object[][] provideTestSql() {
     return new Object[][]{
         // Order BY LIMIT
-        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 10"},
-        new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 1, 2"},
-        new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 2 OFFSET 1"},
-
-        // No match filter
-        new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},
-
-        // Hybrid table
+//        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"},

Review Comment:
   (reminder) I know this is a draft, but let's make sure these pass and uncomment them (or delete them if we don't want them anymore)



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java:
##########
@@ -63,6 +63,12 @@ protected BaseResultsBlock getNextBlock() {
     return _childOperator.nextBlock();
   }
 
+  @Override
+  public void close()

Review Comment:
   (suggestion) maybe this is the default implementation in `BaseOperator`? (will make the review a bit easier)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -76,14 +77,13 @@ protected void run()
         if (!isRunning()) {
           return;
         }
-
         OpChain operatorChain = _scheduler.next();
         _workerPool.submit(new TraceRunnable() {
           @Override
-          public void runJob() {
+          public void runJob()
+              throws InterruptedException {

Review Comment:
   note: this should never throw as the worker pool threads will just die and we'll be left with a dangling worker pool (these issues are really tough to debug). Instead let's catch any exceptions and handle them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao closed pull request #9836: [multistage][POC] Move mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao closed pull request #9836: [multistage][POC] Move mailbox instance from global map to request context  
URL: https://github.com/apache/pinot/pull/9836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9836: [multistage][POC] Move mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1323298327

   I took a look again about the fairness scheduling. This PR has nothing to do with that because it only deals with sending mailbox instead of receiving. I agree receiving side needs more thoughts.
   
   We want to have different connection for different request due to isolation. having one single connection between servers doesn't seem to be a good idea. If one request crashes or has error on the channel, all following request will fail. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9836: [multistage][bugfix] Move sending mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1322647075

   > Thanks @61yao! I think you've identified the right problem to solve, but I think the abstractions in this PR aren't what we need.
   > 
   > From a code ownership perspective, I don't think it makes sense for an operator chain to "own" the underlying physical resources it uses to send/receive data. That'll make it much more difficult in the future to share those resources, and it makes management of those resources happen via various callbacks/asynchronous actions - that in turn makes it harder to debug and harder to reason about.
   > 
   > Instead, I think the current abstraction is pretty good (there's a centralized MailboxService that maintains the mailboxes and there's only one of them for the lifetime of a Pinot server). The problem you've identified is that we need some way to let that centralized service clean up resources that don't exist. I think there might be various different triggers to trigger that:
   > 
   > 1. (as you've identified) an operator finished (either successfully or in error)
   > 2. there might be some kind of timeouts
   > 3. there might be some kind of admin operation that forces it to clean up those mailboxes
   > 
   > The design here (decentralizing the mailbox ownership) would make 2/3 much more difficult - and it leaks information into places that don't need that information.
   > 
   > As an aside, I don't think contention for the concurrent hashmap is a problem - so long as there isn't key contention (which there should almost never be) it will perform extremely fast (and access to it is almost certainly not a bottleneck compared to all the other things a query needs to do).
   
   I agree centralized resource should be in central place. but resources created per request should clean up per request instead of leaving it to centralized request management. 
   
   For example, shared channel between servers is shared and should be managed globally. 
   However, streaming channel, instance or request opened by per request should be cleaned up per request rather than leaving it in central map especially sending mailbox. 
   
   I agree it should not leave inside opchain. but I haven't found a good place to hold the request context yet. 
   
   It doesn't make sense to put per request resource in the central place and clean it up later. It makes it so much easier to leak. 
   
   Ideally, timeout and error should receive the same exit point where we clean up per request resource. 
   
   The actual physical resource say the data block can live globally but the instance of mailbox and the streaming channel should still be managed per resource purpose.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9836: [multistage][bugfix] Move sending mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1321024136

   I haven't figured out a good way to test this. Will follow up with a test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9836: [multistage][bugfix] Move sending mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1322763088

   Discussed with @agavra offline. We will put more thoughts into this and figure out the right next step


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on pull request #9836: [multistage][bugfix] Move sending mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
agavra commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1322570283

   Thanks @61yao! I think you've identified the right problem to solve, but I think the abstractions in this PR aren't what we need.
   
   From a code ownership perspective, I don't think it makes sense for an operator chain to "own" the underlying physical resources it uses to send/receive data. That'll make it much more difficult in the future to share those resources, and it makes management of those resources happen via various callbacks/asynchronous actions - that in turn makes it harder to debug and harder to reason about.
   
   Instead, I think the current abstraction is pretty good (there's a centralized MailboxService that maintains the mailboxes and there's only one of them for the lifetime of a Pinot server). The problem you've identified is that we need some way to let that centralized service clean up resources that don't exist. I think there might be various different triggers to trigger that:
   1. (as you've identified) an operator finished (either successfully or in error)
   2. there might be some kind of timeouts
   3. there might be some kind of admin operation that forces it to clean up those mailboxes
   
   The design here (decentralizing the mailbox ownership) would make 2/3 much more difficult - and it leaks information into places that don't need that information.
   
   As an aside, I don't think contention for the concurrent hashmap is a problem - so long as there isn't key contention (which there should almost never be) it will perform extremely fast (and access to it is almost certainly not a bottleneck compared to all the other things a query needs to do).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9836: [multistage][POC] Move mailbox instance from global map to request context

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9836:
URL: https://github.com/apache/pinot/pull/9836#issuecomment-1344771307

   This PR has too many merge conflicts now.  I'll just write a new one 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org