You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/06 14:29:52 UTC

[pulsar] branch master updated: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse (#15425)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac6bd3c71e2 [PIP 81] Part-1  Split createNewMetadataLedger into multiple methods for reuse (#15425)
ac6bd3c71e2 is described below

commit ac6bd3c71e24b90826438cd44395ca21a849067f
Author: feynmanlin <31...@qq.com>
AuthorDate: Fri May 6 22:29:43 2022 +0800

    [PIP 81] Part-1  Split createNewMetadataLedger into multiple methods for reuse (#15425)
    
    ### Motivation
    It is difficult to get CR due to so many modifications in PIP 81 #10729.
    So I split this PR into multiple sub-PRs to facilitate CR
    
    ### Modifications
    I split the logic in createNewMetadataLedger into multiple methods to facilitate subsequent reuse
    1. Put the logic of creating Ledger into `doCreateNewMetadataLedger` separately
    2. `switchToNewLedger` does not need to wrap another layer of callback, so remove redundant wrapping
    3. When persisting data fails, we need to delete the created Ledger, this part of the logic is put into `deleteLedger`
    
    ### Verifying this change
    This modification does not change any previous behavior, so no unit tests are required, but the previous unit tests must be ensured to pass
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 107 +++++++++++----------
 1 file changed, 58 insertions(+), 49 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9ac58f11508..e70f3083d86 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -59,7 +60,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -2532,10 +2532,45 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);
                 return;
             }
 
@@ -2544,63 +2579,35 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (rc != BKException.Code.OK) {
                     log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
                             BKException.getMessage(rc));
-                    callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                    future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
                     return;
                 }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
                 }
-                // Created the ledger, now write the last position
-                // content
-                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
-                    @Override
-                    public void operationComplete() {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
-                                    mdEntry.newPosition, name);
-                        }
-                        switchToNewLedger(lh, new VoidCallback() {
-                            @Override
-                            public void operationComplete() {
-                                callback.operationComplete();
-                            }
-
-                            @Override
-                            public void operationFailed(ManagedLedgerException exception) {
-                                // it means it failed to switch the newly created ledger so, it should be
-                                // deleted to prevent leak
-                                bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                    if (rc != BKException.Code.OK) {
-                                        log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
-                                                lh.getId());
-                                    }
-                                }, null);
-                                callback.operationFailed(exception);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void operationFailed(ManagedLedgerException exception) {
-                        log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
-                                mdEntry.newPosition, name);
-
-                        ledger.mbean.startCursorLedgerDeleteOp();
-                        bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
-                            @Override
-                            public void deleteComplete(int rc, Object ctx) {
-                                ledger.mbean.endCursorLedgerDeleteOp();
-                            }
-                        }, null);
-                        callback.operationFailed(exception);
-                    }
-                });
+                future.complete(lh);
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+
+        return future;
     }
 
+    private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
+        ledger.mbean.startCursorLedgerDeleteOp();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        bookkeeper.asyncDeleteLedger(ledgerHandle.getId(), (int rc, Object ctx) -> {
+            future.complete(null);
+            ledger.mbean.endCursorLedgerDeleteOp();
+            if (rc != BKException.Code.OK) {
+                log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
+                        ledgerHandle.getId());
+            }
+        }, null);
+        return future;
+    }
+
+
     private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
         if (properties.isEmpty()) {
             return Collections.emptyList();
@@ -2786,7 +2793,9 @@ public class ManagedCursorImpl implements ManagedCursor {
             @Override
             public void operationFailed(MetaStoreException e) {
                 log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e);
-                callback.operationFailed(e);
+                // it means it failed to switch the newly created ledger so, it should be
+                // deleted to prevent leak
+                deleteLedgerAsync(lh).thenRun(() -> callback.operationFailed(e));
             }
         }, false);
     }