You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2024/04/15 07:14:01 UTC
(pulsar) branch branch-3.2 updated: [fix][broker] Create new ledger after the current ledger is closed (#22034)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f73a1cb8c49 [fix][broker] Create new ledger after the current ledger is closed (#22034)
f73a1cb8c49 is described below
commit f73a1cb8c49a9193714c1dd57d9cab6f5a30e11d
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Mar 22 11:52:47 2024 +0800
[fix][broker] Create new ledger after the current ledger is closed (#22034)
(cherry picked from commit d0ca9835cf972ce156bd4a1fc5d109482330857d)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 22 ++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 33 ++++--
.../mledger/impl/ManagedLedgerFactoryTest.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 111 +++++++++++++++-----
.../mledger/impl/NonDurableCursorTest.java | 17 ++--
.../mledger/impl/ShadowManagedLedgerImplTest.java | 5 +-
.../broker/service/BacklogQuotaManagerTest.java | 13 +--
.../broker/service/BrokerBkEnsemblesTests.java | 12 +--
.../broker/service/BrokerBookieIsolationTest.java | 112 ++++++++++++++-------
.../broker/service/ConsumedLedgersTrimTest.java | 6 +-
.../client/impl/ProducerConsumerInternalTest.java | 44 ++++++++
12 files changed, 275 insertions(+), 104 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9bbcda327f0..9bb7b02ce4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1278,7 +1278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
newReadPosition = ledger.getFirstPosition();
} else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
- newReadPosition = ledger.getLastPosition().getNext();
+ newReadPosition = ledger.getNextValidPosition(ledger.getLastPosition());
} else {
newReadPosition = proposedReadPosition;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 426ac8df218..1274b347263 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1755,10 +1755,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
- if (!pendingAddEntries.isEmpty()) {
- // Need to create a new ledger to write pending entries
- createLedgerAfterClosed();
- }
+ createLedgerAfterClosed();
}
@Override
@@ -1813,7 +1810,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
ledgerClosed(lh);
- createLedgerAfterClosed();
}
}, null);
}
@@ -2649,7 +2645,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} else {
PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
if (slowestReaderPosition != null) {
- slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
+ // The slowest reader position is the mark delete position.
+ // If the slowest reader position point the last entry in the ledger x,
+ // the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
+ LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
+ if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
+ && ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
+ slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1;
+ } else {
+ slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
+ }
} else {
promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
trimmerMutex.unlock();
@@ -3693,7 +3698,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum);
while (!isValidPosition(skippedPosition)) {
Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
+ // This means it has jumped to the last position
if (nextLedgerId == null) {
+ if (currentLedgerEntries == 0) {
+ return PositionImpl.get(currentLedger.getId(), 0);
+ }
return lastConfirmedEntry.getNext();
}
skippedPosition = PositionImpl.get(nextLedgerId, 0);
@@ -4485,7 +4494,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
ledgerClosed(lh);
- createLedgerAfterClosed();
// we do not create ledger here, since topic is inactive for a long time.
}, null);
return true;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 644f53c3a52..c9bd64171c1 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -234,15 +234,16 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
@Test
void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
- final int entryCount = 10;
+ final int entryCount = 9;
final String cursorName = "c1";
final String mlName = "ml_test";
- final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ // Avoid creating new empty ledger after the last ledger is full and remove fail future.
+ final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursor cursor = ml.openCursor("c1");
Position lastEntry = null;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < entryCount; i++) {
lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
}
@@ -809,7 +810,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
assertEquals(firstInNext, cursor.getReadPosition());
moveStatus.set(false);
- // reset to a non exist larger ledger should point to the first non-exist entry in the last ledger
+ // reset to a non exist larger ledger should point to the first non-exist entry in the next ledger
PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
try {
cursor.resetCursor(latest);
@@ -818,11 +819,13 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
- PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1);
- assertEquals(lastPos, cursor.getReadPosition());
+ PositionImpl lastPos = new PositionImpl(last.getLedgerId() + 1, 0);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(lastPos, cursor.getReadPosition());
+ });
moveStatus.set(false);
- // reset to latest should point to the first non-exist entry in the last ledger
+ // reset to latest should point to the first non-exist entry in the next ledger
PositionImpl anotherLast = PositionImpl.LATEST;
try {
cursor.resetCursor(anotherLast);
@@ -1701,7 +1704,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testSkipEntries(boolean useOpenRangeSet) throws Exception {
- ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
Position pos;
@@ -1715,6 +1718,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ // Wait new empty ledger created completely.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.ledgers.size(), 2);
+ });
+
// skip entries in same ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 1);
@@ -1722,7 +1730,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
// skip entries until end of ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getReadPosition(), pos.getNext());
+ assertEquals(c1.getReadPosition(), new PositionImpl(ledger.currentLedger.getId(), 0));
assertEquals(c1.getMarkDeletedPosition(), pos);
// skip entries across ledgers
@@ -1737,7 +1745,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
c1.skipEntries(10, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertFalse(c1.hasMoreEntries());
- assertEquals(c1.getReadPosition(), pos.getNext());
+ // We can not check the ledger id because a cursor leger can be created.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(c1.getReadPosition().getEntryId(), 0);
+ });
assertEquals(c1.getMarkDeletedPosition(), pos);
}
@@ -1759,7 +1770,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
c1.skipEntries(3, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getReadPosition(), pos5.getNext());
+ assertEquals(c1.getReadPosition(), new PositionImpl(pos5.getLedgerId() + 1, 0));
assertEquals(c1.getMarkDeletedPosition(), pos5);
pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index 4f2c3e17877..a953b140aba 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -54,7 +54,7 @@ public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
ManagedLedgerInfo info = factory.getManagedLedgerInfo("testGetManagedLedgerInfo");
- assertEquals(info.ledgers.size(), 4);
+ assertEquals(info.ledgers.size(), 5);
assertEquals(info.ledgers.get(0).ledgerId, 3);
assertEquals(info.ledgers.get(1).ledgerId, 4);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 9ad1e9ff48e..2139a662962 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
@@ -1121,9 +1122,13 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
cursor.markDelete(lastPosition);
- while (ledger.getNumberOfEntries() != 2) {
- Thread.sleep(10);
- }
+ Awaitility.await().untilAsserted(() -> {
+ // The number of entries in the ledger should not contain the entry in the mark delete position.
+ // last position is the position of entry-3.
+ // cursor.markDelete(lastPosition);
+ // only entry-4 is left in the ledger.
+ assertEquals(ledger.getNumberOfEntries(), 1);
+ });
}
@Test(timeOut = 20000)
@@ -2437,7 +2442,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Awaitility.await().untilAsserted(() -> {
assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
- assertEquals(ml.getLedgersInfoAsList().size(), 5);
+ assertEquals(ml.getLedgersInfoAsList().size(), 6);
});
}
@@ -2695,9 +2700,17 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
assertEquals(ledger.getNextValidPosition(p1), p2);
- assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
- assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
- assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId() + 1, 0));
+ });
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)),
+ PositionImpl.get(p3.getLedgerId() + 1, 0));
+ });
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)),
+ PositionImpl.get(p3.getLedgerId() + 1, 0));
+ });
}
/**
@@ -3036,19 +3049,22 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
String content = "entry" + i; // 5 bytes
ledger.addEntry(content.getBytes());
}
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.currentLedgerSize, 0);
+ assertEquals(ledger.ledgers.size(), 1);
+ });
// Open Cursor also adds cursor into activeCursor-container
ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);
// Since getReadPosition returns the next position, we decrease the entryId by 1
- PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();
Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
-
- assertEquals(latestPositionAndCounter.getLeft().getNext(), p1);
- assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2);
+ // The read position is the valid next position of the last position instead of the next position.
+ assertEquals(ledger.getNextValidPosition(latestPositionAndCounter.getLeft()), latestCursor.getReadPosition());
+ assertEquals(ledger.getNextValidPosition(earliestPositionAndCounter.getLeft()), p2);
assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries);
assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false));
@@ -3471,7 +3487,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.addEntry(new byte[1024 * 1024]);
}
- Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(),
+ msgNum / 2 + 1));
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());
@@ -3486,6 +3503,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ ledger.trimConsumedLedgersInBackground(completableFuture);
+ completableFuture.get();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}
@@ -3651,8 +3671,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
- assertEquals(ledger.ledgers.size(), 3);
- assertEquals(ledger.ledgerCache.size(), 2);
+ Awaitility.await().untilAsserted(() -> {
+ log.error("ledger.ledgerCache.size() : " + ledger.ledgerCache.size());
+ assertEquals(ledger.ledgerCache.size(), 3);
+ assertEquals(ledger.ledgers.size(), 4);
+ });
+
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
@@ -3681,15 +3705,15 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
List<Entry> entryList = cursor.readEntries(entries);
assertEquals(entryList.size(), entries);
- assertEquals(ledger.ledgers.size(), entries);
- assertEquals(ledger.ledgerCache.size(), entries - 1);
+ assertEquals(ledger.ledgers.size() - 1, entries);
+ assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
cursor.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
// Cleanup fails because ManagedLedgerNotFoundException is thrown
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.ledgers.size(), entries);
- assertEquals(ledger.ledgerCache.size(), entries - 1);
+ assertEquals(ledger.ledgers.size() - 1, entries);
+ assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
});
// The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
Awaitility.await().untilAsserted(() ->
@@ -3715,13 +3739,13 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
- assertEquals(ledger.ledgers.size(), 3);
- assertEquals(ledger.ledgerCache.size(), 2);
+ assertEquals(ledger.ledgers.size(), 4);
+ assertEquals(ledger.ledgerCache.size(), 3);
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.ledgers.size(), 3);
+ assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});
@@ -3729,11 +3753,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest);
entryList = cursor3.readEntries(3);
assertEquals(entryList.size(), 3);
- assertEquals(ledger.ledgerCache.size(), 2);
+ assertEquals(ledger.ledgerCache.size(), 3);
cursor3.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.ledgers.size(), 3);
+ assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});
@@ -4256,4 +4280,45 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+
+ @DataProvider(name = "closeLedgerByAddEntry")
+ public Object[][] closeLedgerByAddEntry() {
+ return new Object[][] {{Boolean.TRUE}, {Boolean.FALSE}};
+ }
+
+ @Test(dataProvider = "closeLedgerByAddEntry")
+ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) throws Exception {
+ // Setup: Open a manageLedger with one initial entry.
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed",
+ config));
+ assertEquals(ml.ledgers.size(), 1);
+ ml.addEntry(new byte[4]);
+ // Act: Trigger the rollover of the current ledger.
+ long currentLedgerID = ml.currentLedger.getId();
+ ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
+ Thread.sleep(10);
+ if (closeLedgerByAddEntry) {
+ // Detect the current ledger is full before written entry and close the ledger after writing completely.
+ ml.addEntry(new byte[4]);
+ } else {
+ // Detect the current ledger is full by the timed task. (Imitate: the timed task `checkLedgerRollTask` call
+ // `rollCurrentLedgerIfFull` periodically).
+ ml.rollCurrentLedgerIfFull();
+ // the ledger closing in the `rollCurrentLedgerIfFull` is async, so the wait is needed.
+ Awaitility.await().untilAsserted(() -> assertEquals(ml.ledgers.size(), 2));
+ }
+ // Act: Trigger trimming to delete the previous current ledger.
+ ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
+ // Verify: A new ledger will be opened after the current ledger is closed and the previous current ledger can be
+ // deleted.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.state, ManagedLedgerImpl.State.LedgerOpened);
+ assertEquals(ml.ledgers.size(), 1);
+ assertNotEquals(currentLedgerID, ml.currentLedger.getId());
+ assertEquals(ml.currentLedgerEntries, 0);
+ });
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 1e1f7df0a46..82141bfd0ee 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -589,12 +589,12 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
/* Position p1 = */ ledger.addEntry("entry-1".getBytes());
/* Position p2 = */ ledger.addEntry("entry-2".getBytes());
- Position p3 = ledger.addEntry("entry-3".getBytes());
+ /* Position p3 = */ ledger.addEntry("entry-3".getBytes());
Thread.sleep(300);
ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
- assertEquals(c1.getReadPosition(), p3);
- assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1));
+ assertEquals(c1.getReadPosition(), new PositionImpl(6, 0));
+ assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(6, -1));
}
@Test // (timeOut = 20000)
@@ -723,9 +723,10 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
CompletableFuture<Void> promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();
-
- assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
- assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
+ // The mark delete position has moved to position 4:1, and the ledger 4 only has one entry,
+ // so the ledger 4 can be deleted. nonDurableCursor should has the same backlog with durable cursor.
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 5);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 5);
c1.close();
ledger.deleteCursor(c1.getName());
@@ -733,8 +734,8 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
ledger.internalTrimConsumedLedgers(promise);
promise.join();
- assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
- assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
+ assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+ assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
ledger.close();
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
index 4482e9944c0..cc4b3f24811 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
@@ -148,7 +148,10 @@ public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase {
newPos = sourceML.addEntry(data); // new ledger rolled.
newPos = sourceML.addEntry(data);
- Awaitility.await().untilAsserted(() -> assertEquals(shadowML.ledgers.size(), 5));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(shadowML.ledgers.size(), 6);
+ assertEquals(shadowML.currentLedgerEntries, 0);
+ });
assertEquals(future.get(), fakePos);
// LCE should be updated.
log.info("3.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3c829b02cb8..048b4fd699b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -228,7 +228,7 @@ public class BacklogQuotaManagerTest {
// non-durable mes should still
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
- assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
+ assertEquals(nonDurableSubscriptionBacklog, 0,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
MessageIdImpl msgId = null;
@@ -254,9 +254,6 @@ public class BacklogQuotaManagerTest {
// check there is only one ledger left
assertEquals(internalStats.ledgers.size(), 1);
-
- // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, finalMsgId.getLedgerId());
});
// check reader can still read with out error
@@ -303,10 +300,10 @@ public class BacklogQuotaManagerTest {
TopicStats stats = getTopicStats(topic1);
// overall backlogSize should be zero because we only have readers
assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
- // non-durable mes should still
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
- assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
+ // All the full ledgers should be deleted.
+ assertEquals(nonDurableSubscriptionBacklog, 0,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
MessageIdImpl messageId = null;
try {
@@ -327,8 +324,8 @@ public class BacklogQuotaManagerTest {
// check there is only one ledger left
assertEquals(internalStats.ledgers.size(), 1);
- // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
- assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId());
+ // check if it's the expected ledger id given MAX_ENTRIES_PER_LEDGER
+ assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId() + 1);
});
// check reader can still read with out error
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 40649a41640..42b9358911a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -191,9 +191,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
.build();
final String ns1 = "prop/usc/crash-broker";
- final int totalMessages = 100;
+ final int totalMessages = 99;
final int totalDataLedgers = 5;
- final int entriesPerLedger = totalMessages / totalDataLedgers;
+ final int entriesPerLedger = 20;
try {
admin.namespaces().createNamespace(ns1);
@@ -273,9 +273,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100);
- // (5) consumer will be able to consume 20 messages from last non-deleted ledger
+ // (5) consumer will be able to consume 19 messages from last non-deleted ledger
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
- for (int i = 0; i < entriesPerLedger; i++) {
+ for (int i = 0; i < entriesPerLedger - 1; i++) {
msg = consumer.receive();
System.out.println(i);
consumer.acknowledge(msg);
@@ -296,9 +296,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
.statsInterval(0, TimeUnit.SECONDS)
.build();
- final int totalMessages = 100;
+ final int totalMessages = 99;
final int totalDataLedgers = 5;
- final int entriesPerLedger = totalMessages / totalDataLedgers;
+ final int entriesPerLedger = 20;
final String tenant = "prop";
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 89686c65add..19aa3ae0bd1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -231,28 +231,43 @@ public class BrokerBookieIsolationTest {
LedgerManager ledgerManager = getLedgerManager(bookie1);
// namespace: ns1
- ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+ // totalLedgers = totalPublish / totalEntriesPerLedger. (totalPublish = 100, totalEntriesPerLedger = 20.)
+ // The last ledger is full, a new empty ledger will be created.
+ // The ledger is created async, so adding a wait is needed.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml1.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+ assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
// namespace: ns2
- ml = (ManagedLedgerImpl) topic2.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml2.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
// namespace: ns3
- ml = (ManagedLedgerImpl) topic3.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml3.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
// namespace: ns4
- ml = (ManagedLedgerImpl) topic4.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml4.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies);
ManagedLedgerClientFactory mlFactory =
(ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
@@ -389,11 +404,14 @@ public class BrokerBookieIsolationTest {
ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
// namespace: ns2
- assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers);
-
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml2.getCurrentLedgerEntries(), 0);
+ });
List<LedgerInfo> ledgers = ml2.getLedgersInfoAsList();
// validate ledgers' ensemble with affinity bookies
- for (int i=1; i<ledgers.size();i++) {
+ // The second ledger will be created after the first ledger is full and the isolationGroup has not been set.
+ for (int i=2; i<ledgers.size();i++) {
LedgerInfo lInfo = ledgers.get(i);
long ledgerId = lInfo.getLedgerId();
CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId);
@@ -530,28 +548,40 @@ public class BrokerBookieIsolationTest {
LedgerManager ledgerManager = getLedgerManager(bookie1);
// namespace: ns1
- ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml1.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+ assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
// namespace: ns2
- ml = (ManagedLedgerImpl) topic2.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml2.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
// namespace: ns3
- ml = (ManagedLedgerImpl) topic3.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml3.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
// namespace: ns4
- ml = (ManagedLedgerImpl) topic4.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml4.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies);
ManagedLedgerClientFactory mlFactory =
(ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
@@ -689,22 +719,32 @@ public class BrokerBookieIsolationTest {
LedgerManager ledgerManager = getLedgerManager(bookie1);
// namespace: ns1
- ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml1.getCurrentLedgerEntries(), 0);
+ });
+
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+ assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
// namespace: ns2
- ml = (ManagedLedgerImpl) topic2.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml2.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
// namespace: ns3
- ml = (ManagedLedgerImpl) topic3.getManagedLedger();
- assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+ assertEquals(ml3.getCurrentLedgerEntries(), 0);
+ });
// validate ledgers' ensemble with affinity bookies
- assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+ assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
ManagedLedgerClientFactory mlFactory =
(ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 80db4c30f45..30867dd2cb4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -97,11 +97,13 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
}
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
- Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2);
+ });
//no traffic, unconsumed ledger will be retained
Thread.sleep(1200);
- Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2);
for (int i = 0; i < msgNum; i++) {
Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index 4ec81070306..a06085d3d46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -29,11 +30,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -186,4 +190,44 @@ public class ProducerConsumerInternalTest extends ProducerConsumerBase {
future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId));
future.get();
}
+
+
+ @Test
+ public void testRetentionPolicyByProducingMessages() throws Exception {
+ // Setup: configure the entries per ledger and retention polices.
+ final int maxEntriesPerLedger = 10, messagesCount = 10;
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ pulsar.getConfiguration().setManagedLedgerMaxEntriesPerLedger(maxEntriesPerLedger);
+ pulsar.getConfiguration().setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(0);
+ pulsar.getConfiguration().setDefaultRetentionSizeInMB(0);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-sub")
+ .subscribe();
+ // Act: prepare a full ledger data and ack them.
+ for (int i = 0; i < messagesCount; i++) {
+ producer.newMessage().sendAsync();
+ }
+ for (int i = 0; i < messagesCount; i++) {
+ Message<byte[]> message = consumer.receive();
+ assertNotNull(message);
+ consumer.acknowledge(message);
+ }
+ // Verify: a new empty ledger will be created after the current ledger is fulled.
+ // And the previous consumed ledgers will be deleted
+ Awaitility.await().untilAsserted(() -> {
+ admin.topics().trimTopic(topicName);
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStatsAsync(topicName).get();
+ assertEquals(internalStats.currentLedgerEntries, 0);
+ assertEquals(internalStats.ledgers.size(), 1);
+ });
+ }
}