You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 07:07:27 UTC

[pulsar] 04/09: [fix][broker] fix No such ledger exception (#16420)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0dc2cede6c76d23e02de039961977e38314a8d20
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 8 21:36:12 2022 +0800

    [fix][broker] fix No such ledger exception (#16420)
    
    * fix No such ledger exception
    
    * move currentLedgerEntries and currentLedgerSize
    
    * move ledgers.put  to operationComplete
    
    * fix ledgers.put
    
    * use ledgersTmp to write zookkeeper,after sucess,update ldgers
    
    * update updateLedgersListAfterRollover
    
    * put lh to ledgersTmp
    
    * extend buildManagedLedgerInfo
    
    * getManagedLedgerInfo(newLedger) after get metadataMutex
    
    (cherry picked from commit 2e5fbbc09af28d1ba4d159ee0cdb71ec7735bd29)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 44e0ed96a25..3a35d71736e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1417,11 +1417,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             lastLedgerCreationFailureTimestamp = clock.millis();
         } else {
             log.info("[{}] Created new ledger {}", name, lh.getId());
-            ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
-            currentLedger = lh;
-            currentLedgerEntries = 0;
-            currentLedgerSize = 0;
-
+            LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
             final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() {
                 @Override
                 public void operationComplete(Void v, Stat stat) {
@@ -1429,6 +1425,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
+                    ledgers.put(lh.getId(), newLedger);
+                    currentLedger = lh;
+                    currentLedgerEntries = 0;
+                    currentLedgerSize = 0;
                     metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
@@ -1443,8 +1443,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 @Override
                 public void operationFailed(MetaStoreException e) {
                     log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
-                    // Remove the ledger, since we failed to update the list
-                    ledgers.remove(lh.getId());
                     mbean.startDataLedgerDeleteOp();
                     bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
                         mbean.endDataLedgerDeleteOp();
@@ -1479,21 +1477,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 }
             };
 
-            updateLedgersListAfterRollover(cb);
+            updateLedgersListAfterRollover(cb, newLedger);
         }
     }
-
-    private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
+    private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
         if (!metadataMutex.tryLock()) {
             // Defer update for later
-            scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
+            scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger),
+                    100, TimeUnit.MILLISECONDS);
             return;
         }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat);
         }
-        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback);
+        ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger);
+        store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
     }
 
     public synchronized void updateLedgersIdsComplete(Stat stat) {
@@ -3483,8 +3482,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return mlInfo.build();
     }
 
+    private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) {
+        ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values())
+                .addLedgerInfo(newLedger);
+        return buildManagedLedgerInfo(mlInfo);
+    }
     private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) {
         ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
+        return buildManagedLedgerInfo(mlInfo);
+    }
+
+    private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) {
         if (state == State.Terminated) {
             mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
                     .setEntryId(lastConfirmedEntry.getEntryId()));