You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/16 12:18:17 UTC

[pulsar] branch branch-2.11 updated: [fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new dacf5b5dc24 [fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)
dacf5b5dc24 is described below

commit dacf5b5dc24f4e69f04d53c3c24a372631164996
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Tue May 16 17:19:46 2023 +0800

    [fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)
    
    close `currentLegder` after roll current ledger if full
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++++++---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++++
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9e999ab5939..d4f24ba7c64 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1724,15 +1724,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                   + "acked ledgerId %s", currentLedger.getId(), lh.getId());
 
                     if (rc == BKException.Code.OK) {
-                        log.debug("Successfully closed ledger {}", lh.getId());
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Successfully closed ledger {}, trigger by rollover full ledger",
+                                    name, lh.getId());
+                        }
                     } else {
-                        log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+                        log.warn("[{}] Error when closing ledger {}, trigger by rollover full ledger, Status={}",
+                                name, lh.getId(), BKException.getMessage(rc));
                     }
 
                     ledgerClosed(lh);
                     createLedgerAfterClosed();
                 }
-            }, System.nanoTime());
+            }, null);
         }
     }
 
@@ -4242,7 +4246,26 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         long currentTimeMs = System.currentTimeMillis();
         if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
             log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs);
-            ledgerClosed(currentLedger);
+            if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
+                LedgerHandle currentLedger = this.currentLedger;
+                currentLedger.asyncClose((rc, lh, o) -> {
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with "
+                            + "acked ledgerId %s", currentLedger.getId(), lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Successfully closed ledger {}, trigger by inactive ledger check",
+                                    name, lh.getId());
+                        }
+                    } else {
+                        log.warn("[{}] Error when closing ledger {}, trigger by inactive ledger check, Status={}",
+                                name, lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    // we do not create ledger here, since topic is inactive for a long time.
+                }, null);
+            }
         }
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 45fd275a412..418f24db936 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -3866,12 +3867,26 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config);
         ManagedCursor cursor = ledger.openCursor("c1");
 
+        List<Long> ledgerIds = new ArrayList<>();
+
         int totalAddEntries = 5;
         for (int i = 0; i < totalAddEntries; i++) {
             String content = "entry"; // 5 bytes
             ledger.checkInactiveLedgerAndRollOver();
             ledger.addEntry(content.getBytes());
             Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+
+            ledgerIds.add(ledger.currentLedger.getId());
+        }
+
+        Map<Long, PulsarMockLedgerHandle> ledgerMap = bkc.getLedgerMap();
+        // skip check last ledger, it should be open
+        for (int i = 0; i < ledgerIds.size() - 1; i++) {
+            long ledgerId = ledgerIds.get(i);
+            LedgerMetadata ledgerMetadata = ledgerMap.get(ledgerId).getLedgerMetadata();
+            if (ledgerMetadata != null) {
+                assertTrue(ledgerMetadata.isClosed());
+            }
         }
 
         List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();