You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2024/04/15 07:14:01 UTC

(pulsar) branch branch-3.2 updated: [fix][broker] Create new ledger after the current ledger is closed (#22034)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f73a1cb8c49 [fix][broker] Create new ledger after the current ledger is closed (#22034)
f73a1cb8c49 is described below

commit f73a1cb8c49a9193714c1dd57d9cab6f5a30e11d
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Mar 22 11:52:47 2024 +0800

    [fix][broker] Create new ledger after the current ledger is closed (#22034)
    
    (cherry picked from commit d0ca9835cf972ce156bd4a1fc5d109482330857d)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  22 ++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  33 ++++--
 .../mledger/impl/ManagedLedgerFactoryTest.java     |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 111 +++++++++++++++-----
 .../mledger/impl/NonDurableCursorTest.java         |  17 ++--
 .../mledger/impl/ShadowManagedLedgerImplTest.java  |   5 +-
 .../broker/service/BacklogQuotaManagerTest.java    |  13 +--
 .../broker/service/BrokerBkEnsemblesTests.java     |  12 +--
 .../broker/service/BrokerBookieIsolationTest.java  | 112 ++++++++++++++-------
 .../broker/service/ConsumedLedgersTrimTest.java    |   6 +-
 .../client/impl/ProducerConsumerInternalTest.java  |  44 ++++++++
 12 files changed, 275 insertions(+), 104 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9bbcda327f0..9bb7b02ce4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1278,7 +1278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
             newReadPosition = ledger.getFirstPosition();
         } else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
-            newReadPosition = ledger.getLastPosition().getNext();
+            newReadPosition = ledger.getNextValidPosition(ledger.getLastPosition());
         } else {
             newReadPosition = proposedReadPosition;
         }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 426ac8df218..1274b347263 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1755,10 +1755,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
 
-        if (!pendingAddEntries.isEmpty()) {
-            // Need to create a new ledger to write pending entries
-            createLedgerAfterClosed();
-        }
+        createLedgerAfterClosed();
     }
 
     @Override
@@ -1813,7 +1810,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
 
                     ledgerClosed(lh);
-                    createLedgerAfterClosed();
                 }
             }, null);
         }
@@ -2649,7 +2645,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             } else {
                 PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
                 if (slowestReaderPosition != null) {
-                    slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
+                    // The slowest reader position is the mark delete position.
+                    // If the slowest reader position point the last entry in the ledger x,
+                    // the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
+                    LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
+                    if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
+                            && ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
+                        slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1;
+                    } else {
+                        slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
+                    }
                 } else {
                     promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
                     trimmerMutex.unlock();
@@ -3693,7 +3698,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum);
         while (!isValidPosition(skippedPosition)) {
             Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
+            // This means it has jumped to the last position
             if (nextLedgerId == null) {
+                if (currentLedgerEntries == 0) {
+                    return PositionImpl.get(currentLedger.getId(), 0);
+                }
                 return lastConfirmedEntry.getNext();
             }
             skippedPosition = PositionImpl.get(nextLedgerId, 0);
@@ -4485,7 +4494,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
 
                     ledgerClosed(lh);
-                    createLedgerAfterClosed();
                     // we do not create ledger here, since topic is inactive for a long time.
                 }, null);
                 return true;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 644f53c3a52..c9bd64171c1 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -234,15 +234,16 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
     @Test
     void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
-        final int entryCount = 10;
+        final int entryCount = 9;
         final String cursorName = "c1";
         final String mlName = "ml_test";
-        final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+        // Avoid creating new empty ledger after the last ledger is full and remove fail future.
+        final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
         ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
 
         ManagedCursor cursor = ml.openCursor("c1");
         Position lastEntry = null;
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < entryCount; i++) {
             lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
         }
 
@@ -809,7 +810,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertEquals(firstInNext, cursor.getReadPosition());
         moveStatus.set(false);
 
-        // reset to a non exist larger ledger should point to the first non-exist entry in the last ledger
+        // reset to a non exist larger ledger should point to the first non-exist entry in the next ledger
         PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
         try {
             cursor.resetCursor(latest);
@@ -818,11 +819,13 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
             log.warn("error in reset cursor", e.getCause());
         }
         assertTrue(moveStatus.get());
-        PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1);
-        assertEquals(lastPos, cursor.getReadPosition());
+        PositionImpl lastPos = new PositionImpl(last.getLedgerId() + 1, 0);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(lastPos, cursor.getReadPosition());
+        });
         moveStatus.set(false);
 
-        // reset to latest should point to the first non-exist entry in the last ledger
+        // reset to latest should point to the first non-exist entry in the next ledger
         PositionImpl anotherLast = PositionImpl.LATEST;
         try {
             cursor.resetCursor(anotherLast);
@@ -1701,7 +1704,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
     @Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
     void testSkipEntries(boolean useOpenRangeSet) throws Exception {
-        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", new ManagedLedgerConfig()
                 .setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
         Position pos;
 
@@ -1715,6 +1718,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
 
+        // Wait new empty ledger created completely.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.ledgers.size(), 2);
+        });
+
         // skip entries in same ledger
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         assertEquals(c1.getNumberOfEntries(), 1);
@@ -1722,7 +1730,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         // skip entries until end of ledger
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         assertEquals(c1.getNumberOfEntries(), 0);
-        assertEquals(c1.getReadPosition(), pos.getNext());
+        assertEquals(c1.getReadPosition(), new PositionImpl(ledger.currentLedger.getId(), 0));
         assertEquals(c1.getMarkDeletedPosition(), pos);
 
         // skip entries across ledgers
@@ -1737,7 +1745,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         c1.skipEntries(10, IndividualDeletedEntries.Exclude);
         assertEquals(c1.getNumberOfEntries(), 0);
         assertFalse(c1.hasMoreEntries());
-        assertEquals(c1.getReadPosition(), pos.getNext());
+        // We can not check the ledger id because a cursor leger can be created.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(c1.getReadPosition().getEntryId(), 0);
+        });
         assertEquals(c1.getMarkDeletedPosition(), pos);
     }
 
@@ -1759,7 +1770,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
         c1.skipEntries(3, IndividualDeletedEntries.Exclude);
         assertEquals(c1.getNumberOfEntries(), 0);
-        assertEquals(c1.getReadPosition(), pos5.getNext());
+        assertEquals(c1.getReadPosition(), new PositionImpl(pos5.getLedgerId() + 1, 0));
         assertEquals(c1.getMarkDeletedPosition(), pos5);
 
         pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index 4f2c3e17877..a953b140aba 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -54,7 +54,7 @@ public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
 
         ManagedLedgerInfo info = factory.getManagedLedgerInfo("testGetManagedLedgerInfo");
 
-        assertEquals(info.ledgers.size(), 4);
+        assertEquals(info.ledgers.size(), 5);
 
         assertEquals(info.ledgers.get(0).ledgerId, 3);
         assertEquals(info.ledgers.get(1).ledgerId, 4);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 9ad1e9ff48e..2139a662962 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
@@ -1121,9 +1122,13 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         cursor.markDelete(lastPosition);
 
-        while (ledger.getNumberOfEntries() != 2) {
-            Thread.sleep(10);
-        }
+        Awaitility.await().untilAsserted(() -> {
+            // The number of entries in the ledger should not contain the entry in the mark delete position.
+            // last position is the position of entry-3.
+            // cursor.markDelete(lastPosition);
+            // only entry-4 is left in the ledger.
+            assertEquals(ledger.getNumberOfEntries(), 1);
+        });
     }
 
     @Test(timeOut = 20000)
@@ -2437,7 +2442,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         Awaitility.await().untilAsserted(() -> {
             assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
-            assertEquals(ml.getLedgersInfoAsList().size(), 5);
+            assertEquals(ml.getLedgersInfoAsList().size(), 6);
         });
     }
 
@@ -2695,9 +2700,17 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
         assertEquals(ledger.getNextValidPosition(p1), p2);
-        assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
-        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
-        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId() + 1, 0));
+        });
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)),
+                    PositionImpl.get(p3.getLedgerId() + 1, 0));
+        });
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)),
+                    PositionImpl.get(p3.getLedgerId() + 1, 0));
+        });
     }
 
     /**
@@ -3036,19 +3049,22 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             String content = "entry" + i; // 5 bytes
             ledger.addEntry(content.getBytes());
         }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.currentLedgerSize, 0);
+            assertEquals(ledger.ledgers.size(), 1);
+        });
         // Open Cursor also adds cursor into activeCursor-container
         ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
         ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);
 
         // Since getReadPosition returns the next position, we decrease the entryId by 1
-        PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
         PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();
 
         Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
         Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
-
-        assertEquals(latestPositionAndCounter.getLeft().getNext(), p1);
-        assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2);
+        // The read position is the valid next position of the last position instead of the next position.
+        assertEquals(ledger.getNextValidPosition(latestPositionAndCounter.getLeft()), latestCursor.getReadPosition());
+        assertEquals(ledger.getNextValidPosition(earliestPositionAndCounter.getLeft()), p2);
 
         assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries);
         assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false));
@@ -3471,7 +3487,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             ledger.addEntry(new byte[1024 * 1024]);
         }
 
-        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(),
+                msgNum / 2 + 1));
         List<Entry> entries = cursor.readEntries(msgNum);
         Assert.assertEquals(msgNum, entries.size());
 
@@ -3486,6 +3503,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.setAccessible(true);
         stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        ledger.trimConsumedLedgersInBackground(completableFuture);
+        completableFuture.get();
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
     }
@@ -3651,8 +3671,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
         List<Entry> entryList = cursor.readEntries(3);
         assertEquals(entryList.size(), 3);
-        assertEquals(ledger.ledgers.size(), 3);
-        assertEquals(ledger.ledgerCache.size(), 2);
+        Awaitility.await().untilAsserted(() -> {
+            log.error("ledger.ledgerCache.size() : " + ledger.ledgerCache.size());
+            assertEquals(ledger.ledgerCache.size(), 3);
+            assertEquals(ledger.ledgers.size(), 4);
+        });
+
         cursor.clearBacklog();
         cursor2.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
@@ -3681,15 +3705,15 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
         List<Entry> entryList = cursor.readEntries(entries);
         assertEquals(entryList.size(), entries);
-        assertEquals(ledger.ledgers.size(), entries);
-        assertEquals(ledger.ledgerCache.size(), entries - 1);
+        assertEquals(ledger.ledgers.size() - 1, entries);
+        assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
         cursor.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
         // Cleanup fails because ManagedLedgerNotFoundException is thrown
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.ledgers.size(), entries);
-            assertEquals(ledger.ledgerCache.size(), entries - 1);
+            assertEquals(ledger.ledgers.size() - 1, entries);
+            assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
         });
         // The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
         Awaitility.await().untilAsserted(() ->
@@ -3715,13 +3739,13 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
         List<Entry> entryList = cursor.readEntries(3);
         assertEquals(entryList.size(), 3);
-        assertEquals(ledger.ledgers.size(), 3);
-        assertEquals(ledger.ledgerCache.size(), 2);
+        assertEquals(ledger.ledgers.size(), 4);
+        assertEquals(ledger.ledgerCache.size(), 3);
         cursor.clearBacklog();
         cursor2.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.ledgers.size(), 3);
+            assertEquals(ledger.ledgers.size(), 4);
             assertEquals(ledger.ledgerCache.size(), 0);
         });
 
@@ -3729,11 +3753,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest);
         entryList = cursor3.readEntries(3);
         assertEquals(entryList.size(), 3);
-        assertEquals(ledger.ledgerCache.size(), 2);
+        assertEquals(ledger.ledgerCache.size(), 3);
         cursor3.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.ledgers.size(), 3);
+            assertEquals(ledger.ledgers.size(), 4);
             assertEquals(ledger.ledgerCache.size(), 0);
         });
 
@@ -4256,4 +4280,45 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         verify(ledgerOffloader, times(0))
             .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
     }
+
+
+    @DataProvider(name = "closeLedgerByAddEntry")
+    public Object[][] closeLedgerByAddEntry() {
+        return new Object[][] {{Boolean.TRUE}, {Boolean.FALSE}};
+    }
+
+    @Test(dataProvider = "closeLedgerByAddEntry")
+    public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) throws Exception {
+        // Setup: Open a manageLedger with one initial entry.
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed",
+                config));
+        assertEquals(ml.ledgers.size(), 1);
+        ml.addEntry(new byte[4]);
+        // Act: Trigger the rollover of the current ledger.
+        long currentLedgerID = ml.currentLedger.getId();
+        ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
+        Thread.sleep(10);
+        if (closeLedgerByAddEntry) {
+            // Detect the current ledger is full before written entry and close the ledger after writing completely.
+            ml.addEntry(new byte[4]);
+        } else {
+            // Detect the current ledger is full by the timed task. (Imitate: the timed task `checkLedgerRollTask` call
+            // `rollCurrentLedgerIfFull` periodically).
+            ml.rollCurrentLedgerIfFull();
+            // the ledger closing in the `rollCurrentLedgerIfFull` is async, so the wait is needed.
+            Awaitility.await().untilAsserted(() -> assertEquals(ml.ledgers.size(), 2));
+        }
+        // Act: Trigger trimming to delete the previous current ledger.
+        ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
+        // Verify: A new ledger will be opened after the current ledger is closed and the previous current ledger can be
+        // deleted.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml.state, ManagedLedgerImpl.State.LedgerOpened);
+            assertEquals(ml.ledgers.size(), 1);
+            assertNotEquals(currentLedgerID, ml.currentLedger.getId());
+            assertEquals(ml.currentLedgerEntries, 0);
+        });
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 1e1f7df0a46..82141bfd0ee 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -589,12 +589,12 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
 
         /* Position p1 = */ ledger.addEntry("entry-1".getBytes());
         /* Position p2 = */ ledger.addEntry("entry-2".getBytes());
-        Position p3 = ledger.addEntry("entry-3".getBytes());
+        /* Position p3 = */ ledger.addEntry("entry-3".getBytes());
 
         Thread.sleep(300);
         ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
-        assertEquals(c1.getReadPosition(), p3);
-        assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1));
+        assertEquals(c1.getReadPosition(), new PositionImpl(6, 0));
+        assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(6, -1));
     }
 
     @Test // (timeOut = 20000)
@@ -723,9 +723,10 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         ledger.internalTrimConsumedLedgers(promise);
         promise.join();
-
-        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
-        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
+        // The mark delete position has moved to position 4:1, and the ledger 4 only has one entry,
+        // so the ledger 4 can be deleted. nonDurableCursor should has the same backlog with durable cursor.
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 5);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 5);
 
         c1.close();
         ledger.deleteCursor(c1.getName());
@@ -733,8 +734,8 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ledger.internalTrimConsumedLedgers(promise);
         promise.join();
 
-        assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
-        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
 
         ledger.close();
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
index 4482e9944c0..cc4b3f24811 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
@@ -148,7 +148,10 @@ public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase {
 
             newPos = sourceML.addEntry(data); // new ledger rolled.
             newPos = sourceML.addEntry(data);
-            Awaitility.await().untilAsserted(() -> assertEquals(shadowML.ledgers.size(), 5));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(shadowML.ledgers.size(), 6);
+                assertEquals(shadowML.currentLedgerEntries, 0);
+            });
             assertEquals(future.get(), fakePos);
             // LCE should be updated.
             log.info("3.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3c829b02cb8..048b4fd699b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -228,7 +228,7 @@ public class BacklogQuotaManagerTest {
             // non-durable mes should still
             assertEquals(stats.getSubscriptions().size(), 1);
             long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
-            assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
+            assertEquals(nonDurableSubscriptionBacklog, 0,
               "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
 
             MessageIdImpl msgId = null;
@@ -254,9 +254,6 @@ public class BacklogQuotaManagerTest {
 
                 // check there is only one ledger left
                 assertEquals(internalStats.ledgers.size(), 1);
-
-                // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
-                assertEquals(internalStats.ledgers.get(0).ledgerId, finalMsgId.getLedgerId());
             });
 
             // check reader can still read with out error
@@ -303,10 +300,10 @@ public class BacklogQuotaManagerTest {
             TopicStats stats = getTopicStats(topic1);
             // overall backlogSize should be zero because we only have readers
             assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
-            // non-durable mes should still
             assertEquals(stats.getSubscriptions().size(), 1);
             long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
-            assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
+            // All the full ledgers should be deleted.
+            assertEquals(nonDurableSubscriptionBacklog, 0,
               "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
             MessageIdImpl messageId = null;
             try {
@@ -327,8 +324,8 @@ public class BacklogQuotaManagerTest {
                 // check there is only one ledger left
                 assertEquals(internalStats.ledgers.size(), 1);
 
-                // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
-                assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId());
+                // check if it's the expected ledger id given MAX_ENTRIES_PER_LEDGER
+                assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId() + 1);
             });
             // check reader can still read with out error
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 40649a41640..42b9358911a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -191,9 +191,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
                 .build();
 
         final String ns1 = "prop/usc/crash-broker";
-        final int totalMessages = 100;
+        final int totalMessages = 99;
         final int totalDataLedgers = 5;
-        final int entriesPerLedger = totalMessages / totalDataLedgers;
+        final int entriesPerLedger = 20;
 
         try {
             admin.namespaces().createNamespace(ns1);
@@ -273,9 +273,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
 
         retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100);
 
-        // (5) consumer will be able to consume 20 messages from last non-deleted ledger
+        // (5) consumer will be able to consume 19 messages from last non-deleted ledger
         consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
-        for (int i = 0; i < entriesPerLedger; i++) {
+        for (int i = 0; i < entriesPerLedger - 1; i++) {
             msg = consumer.receive();
             System.out.println(i);
             consumer.acknowledge(msg);
@@ -296,9 +296,9 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
-        final int totalMessages = 100;
+        final int totalMessages = 99;
         final int totalDataLedgers = 5;
-        final int entriesPerLedger = totalMessages / totalDataLedgers;
+        final int entriesPerLedger = 20;
 
         final String tenant = "prop";
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 89686c65add..19aa3ae0bd1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -231,28 +231,43 @@ public class BrokerBookieIsolationTest {
         LedgerManager ledgerManager = getLedgerManager(bookie1);
 
         // namespace: ns1
-        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+        // totalLedgers = totalPublish / totalEntriesPerLedger. (totalPublish = 100, totalEntriesPerLedger = 20.)
+        // The last ledger is full, a new empty ledger will be created.
+        // The ledger is created async, so adding a wait is needed.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml1.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+        assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
 
         // namespace: ns2
-        ml = (ManagedLedgerImpl) topic2.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml2.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
 
         // namespace: ns3
-        ml = (ManagedLedgerImpl) topic3.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml3.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
 
         // namespace: ns4
-        ml = (ManagedLedgerImpl) topic4.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml4.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
             (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
@@ -389,11 +404,14 @@ public class BrokerBookieIsolationTest {
 
         ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
         // namespace: ns2
-        assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers);
-
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml2.getCurrentLedgerEntries(), 0);
+        });
         List<LedgerInfo> ledgers = ml2.getLedgersInfoAsList();
         // validate ledgers' ensemble with affinity bookies
-        for (int i=1; i<ledgers.size();i++) {
+        // The second ledger will be created after the first ledger is full and the isolationGroup has not been set.
+        for (int i=2; i<ledgers.size();i++) {
             LedgerInfo lInfo = ledgers.get(i);
             long ledgerId = lInfo.getLedgerId();
             CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId);
@@ -530,28 +548,40 @@ public class BrokerBookieIsolationTest {
         LedgerManager ledgerManager = getLedgerManager(bookie1);
 
         // namespace: ns1
-        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml1.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+        assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
 
         // namespace: ns2
-        ml = (ManagedLedgerImpl) topic2.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml2.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
 
         // namespace: ns3
-        ml = (ManagedLedgerImpl) topic3.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml3.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
 
         // namespace: ns4
-        ml = (ManagedLedgerImpl) topic4.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml4 = (ManagedLedgerImpl) topic4.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml4.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml4.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
                 (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
@@ -689,22 +719,32 @@ public class BrokerBookieIsolationTest {
         LedgerManager ledgerManager = getLedgerManager(bookie1);
 
         // namespace: ns1
-        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml1.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml1.getCurrentLedgerEntries(), 0);
+        });
+
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+        assertAffinityBookies(ledgerManager, ml1.getLedgersInfoAsList(), defaultBookies);
 
         // namespace: ns2
-        ml = (ManagedLedgerImpl) topic2.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml2.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml2.getLedgersInfoAsList(), isolatedBookies);
 
         // namespace: ns3
-        ml = (ManagedLedgerImpl) topic3.getManagedLedger();
-        assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+        ManagedLedgerImpl ml3 = (ManagedLedgerImpl) topic3.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml3.getLedgersInfoAsList().size(), totalLedgers + 1);
+            assertEquals(ml3.getCurrentLedgerEntries(), 0);
+        });
         // validate ledgers' ensemble with affinity bookies
-        assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+        assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
             (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 80db4c30f45..30867dd2cb4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -97,11 +97,13 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         }
 
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
-        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2);
+        });
 
         //no traffic, unconsumed ledger will be retained
         Thread.sleep(1200);
-        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, msgNum / 2);
 
         for (int i = 0; i < msgNum; i++) {
             Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index 4ec81070306..a06085d3d46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -29,11 +30,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -186,4 +190,44 @@ public class ProducerConsumerInternalTest extends ProducerConsumerBase {
         future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId));
         future.get();
     }
+
+
+    @Test
+    public void testRetentionPolicyByProducingMessages() throws Exception {
+        // Setup: configure the entries per ledger and retention polices.
+        final int maxEntriesPerLedger = 10, messagesCount = 10;
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        pulsar.getConfiguration().setManagedLedgerMaxEntriesPerLedger(maxEntriesPerLedger);
+        pulsar.getConfiguration().setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(0);
+        pulsar.getConfiguration().setDefaultRetentionSizeInMB(0);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe();
+        // Act: prepare a full ledger data and ack them.
+        for (int i = 0; i < messagesCount; i++) {
+            producer.newMessage().sendAsync();
+        }
+        for (int i = 0; i < messagesCount; i++) {
+            Message<byte[]> message = consumer.receive();
+            assertNotNull(message);
+            consumer.acknowledge(message);
+        }
+        // Verify: a new empty ledger will be created after the current ledger is fulled.
+        // And the previous consumed ledgers will be deleted
+        Awaitility.await().untilAsserted(() -> {
+            admin.topics().trimTopic(topicName);
+            PersistentTopicInternalStats internalStats = admin.topics().getInternalStatsAsync(topicName).get();
+            assertEquals(internalStats.currentLedgerEntries, 0);
+            assertEquals(internalStats.ledgers.size(), 1);
+        });
+    }
 }