You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/24 10:49:19 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #11387: [ManagedLedger] Pin executor and scheduled executor threads for ManagedLedgerImpl

codelipenghui commented on a change in pull request #11387:
URL: https://github.com/apache/pulsar/pull/11387#discussion_r694736671



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2159,7 +2162,7 @@ void notifyCursors() {
                 break;
             }
 
-            executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
+            pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable));

Review comment:
       There are 2 places will call the `notifyCursors()` method, one is OpAddEntry.safeRun(), it already run the `pinnedExecutor` so don't need to jump again.
   
   Another one is the ledger closed, looks only need to change here.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -252,7 +253,8 @@
     protected volatile State state = null;
 
     private final OrderedScheduler scheduledExecutor;
-    private final OrderedExecutor executor;
+    private final ScheduledExecutorService pinnedScheduledExecutor;
+    private final Executor pinnedExecutor;

Review comment:
       > Would it make sense to use the generic scheduledExecutor (just for deferring purposes) and then jump back into the same pinnedExecutor?
   
   @merlimat Do you mean scheduledExecutor.schedule(pinnedExecutor.execute() ...) ?
   Seems to be a feasible way right now :)

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -141,8 +141,8 @@ void checkReadCompletion() {
                 cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
             }
 
-            // Schedule next read in a different thread
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            // Schedule next read
+            cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {

Review comment:
       It looks like the stack should be `checkReadCompletion -> entryCache.asyncReadEntry0 -> checkReadCompletion ->  entryCache.asyncReadEntry0 -> checkReadCompletion ->  entryCache.asyncReadEntry0` and so on, if we have entries in the cache. 

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
##########
@@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can return them
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {

Review comment:
       I think the entry reading happens one by one, if we got the read entries failed here, this means we will not get a chance to add more elements to the list right(all the previous read operations are done)? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org