You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/26 01:17:15 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

codelipenghui opened a new pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706


   Fixes #10703 
   
   ### Motivation
   
   Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor. The root cause for issue #10703 is the ManagedLedger Interceptor is making a block read call from within a thread used for async operations. The interceptor shouldn’t be making any blocking calls.
   
   Here is the detailed stacktrace:
   
   ```
   "BookKeeperClientWorker-OrderedExecutor-0-0" #67 prio=5 os_prio=0 cpu=50.69ms elapsed=14871.45s allocated=4671K defined_classes=63 tid=0x00007f8cee18e560 nid=0x9b01 waiting on condition  [0x00007f8b8fbfa000]
      java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
    - parking to wait for  <0x0000000602818fc8> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
    at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.11/CompletableFuture.java:1796)
    at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.11/ForkJoinPool.java:3128)
    at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.11/CompletableFuture.java:1823)
    at java.util.concurrent.CompletableFuture.get(java.base@11.0.11/CompletableFuture.java:1998)
    at org.apache.bookkeeper.common.concurrent.FutureUtils.result(FutureUtils.java:72)
    at org.apache.bookkeeper.client.api.ReadHandle.read(ReadHandle.java:58)
    at org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl.onManagedLedgerLastLedgerInitialize(ManagedLedgerInterceptorImpl.java:89)
    at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.lambda$null$0(ManagedLedgerImpl.java:358)
    at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1$$Lambda$800/0x00000008406bb040.run(Unknown Source)
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)
   
      Locked ownable synchronizers:
    - <0x0000000601a92180> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   ```
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639358396



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       We are using CompletableFuture(Void)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639337156



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +82,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        BrokerEntryMetadata brokerEntryMetadata =

Review comment:
       We can probably just do the parsing once




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639357679



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       When does `ReadHandle#readAsync` returns a null entries?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639338576



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -355,9 +355,17 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
                                         .setTimestamp(clock.millis()).build();
                                 ledgers.put(id, info);
                                 if (managedLedgerInterceptor != null) {
-                                    managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh);
+                                    managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                        .whenComplete((__, ex) -> {
+                                            if (ex != null) {
+                                                callback.initializeFailed(new ManagedLedgerInterceptException(ex));
+                                            } else {
+                                                initializeBookKeeper(callback);
+                                            }
+                                        });

Review comment:
       In general it's preferable to do: 
   ```suggestion
                                           .thenRun(() -> initializeBookKeeper(callback))
                                           .exceptionally(ex -> {
                                               callback.initializeFailed(new ManagedLedgerInterceptException(ex));
                                               return null;
                                           });
   ```
   
   The reason is that if there is any unexpected exception thrown when calling `initializeBookKeeper()`, the exceptionally part will be invoked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639357679



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       When does `ReadHandle#readAsync` returns a null entries? Is it an exceptional case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639361210



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       After reading BK side code, the `entries` seems never to be null. Though checking null here doesn't affect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639358859



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       I mean, when `entries` in the callback is null, is there something wrong with the read?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639373340



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       Oh, I got your point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639357679



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       When does `ReadHandle#readAsync` complete with a null `entries`? Is it an exceptional case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org