You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/16 12:18:17 UTC
[pulsar] branch branch-2.11 updated: [fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new dacf5b5dc24 [fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)
dacf5b5dc24 is described below
commit dacf5b5dc24f4e69f04d53c3c24a372631164996
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Tue May 16 17:19:46 2023 +0800
[fix][ml] Fix ledger left in OPEN state when enable `inactiveLedgerRollOverTimeMs` (#20276)
close `currentLegder` after roll current ledger if full
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9e999ab5939..d4f24ba7c64 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1724,15 +1724,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
+ "acked ledgerId %s", currentLedger.getId(), lh.getId());
if (rc == BKException.Code.OK) {
- log.debug("Successfully closed ledger {}", lh.getId());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Successfully closed ledger {}, trigger by rollover full ledger",
+ name, lh.getId());
+ }
} else {
- log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+ log.warn("[{}] Error when closing ledger {}, trigger by rollover full ledger, Status={}",
+ name, lh.getId(), BKException.getMessage(rc));
}
ledgerClosed(lh);
createLedgerAfterClosed();
}
- }, System.nanoTime());
+ }, null);
}
}
@@ -4242,7 +4246,26 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long currentTimeMs = System.currentTimeMillis();
if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs);
- ledgerClosed(currentLedger);
+ if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
+ LedgerHandle currentLedger = this.currentLedger;
+ currentLedger.asyncClose((rc, lh, o) -> {
+ checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with "
+ + "acked ledgerId %s", currentLedger.getId(), lh.getId());
+
+ if (rc == BKException.Code.OK) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Successfully closed ledger {}, trigger by inactive ledger check",
+ name, lh.getId());
+ }
+ } else {
+ log.warn("[{}] Error when closing ledger {}, trigger by inactive ledger check, Status={}",
+ name, lh.getId(), BKException.getMessage(rc));
+ }
+
+ ledgerClosed(lh);
+ // we do not create ledger here, since topic is inactive for a long time.
+ }, null);
+ }
}
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 45fd275a412..418f24db936 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -3866,12 +3867,26 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config);
ManagedCursor cursor = ledger.openCursor("c1");
+ List<Long> ledgerIds = new ArrayList<>();
+
int totalAddEntries = 5;
for (int i = 0; i < totalAddEntries; i++) {
String content = "entry"; // 5 bytes
ledger.checkInactiveLedgerAndRollOver();
ledger.addEntry(content.getBytes());
Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+
+ ledgerIds.add(ledger.currentLedger.getId());
+ }
+
+ Map<Long, PulsarMockLedgerHandle> ledgerMap = bkc.getLedgerMap();
+ // skip check last ledger, it should be open
+ for (int i = 0; i < ledgerIds.size() - 1; i++) {
+ long ledgerId = ledgerIds.get(i);
+ LedgerMetadata ledgerMetadata = ledgerMap.get(ledgerId).getLedgerMetadata();
+ if (ledgerMetadata != null) {
+ assertTrue(ledgerMetadata.isClosed());
+ }
}
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();