You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/26 02:31:03 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10706: Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

codelipenghui commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639358396



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) interceptor)
+                                                    .recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       We are using CompletableFuture(Void)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org