You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/31 06:14:22 UTC

[pulsar] branch branch-2.11 updated (899ce511d74 -> 05efd10278e)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 899ce511d74 [fix][monitor] fix metrics string encoding (#18138)
     new 043cb9fdee0 When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)
     new d612b8f304d Avoid unnecessary creation of BitSetRecyclable objects (#17998)
     new 2bf2f5b757c [improve][ml] Remove the redundant judgment logic of ManagedCursorImpl (#18205)
     new bd27df4b3a4 [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)
     new 05efd10278e [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/ManagedCursorContainer.java       | 139 ++++----
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 117 +++---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 155 ++++----
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +-
 .../mledger/impl/EntryCacheManagerTest.java        |   5 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |  71 ++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 181 +++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 396 ++++++++-------------
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |   4 +
 .../pulsar/broker/transaction/TransactionTest.java |   4 +-
 10 files changed, 586 insertions(+), 488 deletions(-)


[pulsar] 02/05: Avoid unnecessary creation of BitSetRecyclable objects (#17998)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d612b8f304deaca77fa94500c18bfcc1756d0d11
Author: LinChen <15...@qq.com>
AuthorDate: Thu Oct 13 09:51:48 2022 +0800

    Avoid unnecessary creation of BitSetRecyclable objects (#17998)
    
    Co-authored-by: leolinchen <le...@tencent.com>
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java    | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ed861e6830f..3c3bad218d4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2168,11 +2168,12 @@ public class ManagedCursorImpl implements ManagedCursor {
                             individualDeletedMessages);
                     }
                 } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
-                    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) ->
-                        BitSetRecyclable.create().resetWords(position.ackSet));
                     BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
-                    bitSet.and(givenBitSet);
-                    givenBitSet.recycle();
+                    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
+                    if (givenBitSet != bitSet) {
+                        bitSet.and(givenBitSet);
+                        givenBitSet.recycle();
+                    }
                     if (bitSet.isEmpty()) {
                         PositionImpl previousPosition = ledger.getPreviousPosition(position);
                         individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),


[pulsar] 01/05: When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 043cb9fdee098f962c9940995fc57826d2db27d6
Author: LinChen <15...@qq.com>
AuthorDate: Fri Oct 21 18:22:02 2022 +0800

    When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)
    
    Co-authored-by: leolinchen <le...@tencent.com>
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6de68580826..ed861e6830f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1828,7 +1828,27 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
             if (newPosition.ackSet != null) {
-                batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
+                AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
+                BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
+                // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
+                // only update batchDeletedIndexes when the submitted batch index is greater
+                // than the recorded index.
+                batchDeletedIndexes.compute(newPosition,
+                        (k, v) -> {
+                            if (v == null) {
+                                return givenBitSet;
+                            }
+                            if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
+                                bitSetRecyclable.set(v);
+                                return givenBitSet;
+                            } else {
+                                bitSetRecyclable.set(givenBitSet);
+                                return v;
+                            }
+                        });
+                if (bitSetRecyclable.get() != null) {
+                    bitSetRecyclable.get().recycle();
+                }
                 newPosition = ledger.getPreviousPosition(newPosition);
             }
             Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 052f6ac2d54..0e66e76d5c3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.IntRange;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
@@ -3346,6 +3348,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
     }
 
+    @Test
+    public void testBatchIndexMarkdelete() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("test_batch_index_delete");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        final int totalEntries = 100;
+        final Position[] positions = new Position[totalEntries];
+        for (int i = 0; i < totalEntries; i++) {
+            // add entry
+            positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+        assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        markDeleteBatchIndex(cursor, positions[0], 10, 3);
+        List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(3, deletedIndexes.get(0).getEnd());
+
+        markDeleteBatchIndex(cursor, positions[0], 10, 4);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+
+        markDeleteBatchIndex(cursor, positions[0], 10, 2);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+    }
+
     @Test
     public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException {
         ManagedLedger ledger = factory.open("test_batch_index_delete");
@@ -3477,6 +3510,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         pos.ackSet = null;
     }
 
+    private void markDeleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, int batchIndex
+    ) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        PositionImpl pos = (PositionImpl) position;
+        BitSetRecyclable bitSet = new BitSetRecyclable();
+        bitSet.set(0, batchSize);
+        bitSet.clear(0, batchIndex + 1);
+
+        pos.ackSet = bitSet.toLongArray();
+
+        cursor.asyncMarkDelete(pos, new MarkDeleteCallback() {
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+        latch.await();
+        pos.ackSet = null;
+    }
+
     private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) {
         if (bitSetLongArray == null) {
             return null;


[pulsar] 04/05: [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bd27df4b3a4c0cd27db61261e75226cd23b229c0
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Aug 31 06:25:50 2022 +0300

    [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)
    
    * [fix][broker] Fix broken build caused by conflict between #17195 and #16605
    
    - #17195 changed the method signature that #16605 depended upon
    
    * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false
    
    Fixes #16054
    
    - calculate the sorted list of when a read position gets updated
    - this resolves #9958 in a proper way
      - #12045 broke the caching solution as explained in #16054
    - remove invalid tests
    - fix tests
    - add more tests to handle corner cases
    
    * Address review comment
    
    * Handle durable & non-durable in the correct way
    
    * Fix cache tests since now entries get evicted reactively
    
    * Address review comment about method names
    
    * Change signature for add method so that position must be passed
    
    - this is more consistent with cursorUpdated method where the position is passed
    
    * Update javadoc for ManagedCursorContainer
    
    * Address review comment
    
    * Simplify ManagedCursorContainer
    
    * Clarify javadoc
    
    * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally
    
    * Prevent race in updating activeCursors
---
 .../mledger/impl/ManagedCursorContainer.java       | 139 ++++----
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   9 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 155 ++++----
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +-
 .../mledger/impl/EntryCacheManagerTest.java        |   5 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |  71 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 396 ++++++++-------------
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |   4 +
 .../pulsar/broker/transaction/TransactionTest.java |   4 +-
 9 files changed, 341 insertions(+), 444 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index 5a96ee08de9..b5a5be733a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -30,17 +30,16 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
- * Contains all the cursors for a ManagedLedger.
+ * Contains cursors for a ManagedLedger.
  *
  * <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
  *
- * <p/>This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
- * in the linked-list. The list is a sorted double linked-list of cursors.
+ * <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
+ * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
+ * in a single array. More details about heap implementations:
+ * https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
  *
- * <p/>When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
- *
- * <p/>To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
- * care about ledgers to be deleted.
+ * <p/>The heap is updated and kept sorted when a cursor is updated.
  *
  */
 public class ManagedCursorContainer implements Iterable<ManagedCursor> {
@@ -50,30 +49,18 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         PositionImpl position;
         int idx;
 
-        Item(ManagedCursor cursor, int idx) {
+        Item(ManagedCursor cursor, PositionImpl position, int idx) {
             this.cursor = cursor;
-            this.position = (PositionImpl) cursor.getMarkDeletedPosition();
+            this.position = position;
             this.idx = idx;
         }
     }
 
-    public enum CursorType {
-        DurableCursor,
-        NonDurableCursor,
-        ALL
-    }
-
     public ManagedCursorContainer() {
-        cursorType = CursorType.DurableCursor;
-    }
 
-    public ManagedCursorContainer(CursorType cursorType) {
-        this.cursorType = cursorType;
     }
 
-    private final CursorType cursorType;
-
-    // Used to keep track of slowest cursor. Contains all of all active cursors.
+    // Used to keep track of slowest cursor.
     private final ArrayList<Item> heap = new ArrayList();
 
     // Maps a cursor to its position in the heap
@@ -81,46 +68,35 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
     private final StampedLock rwLock = new StampedLock();
 
-    public void add(ManagedCursor cursor) {
+    private int durableCursorCount;
+
+
+    /**
+     * Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when
+     * a position is passed as the second argument. It is expected that the position is updated with
+     * {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes.
+     *
+     * @param cursor cursor to add
+     * @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
+     *                 tracked for the slowest reader.
+     */
+    public void add(ManagedCursor cursor, Position position) {
         long stamp = rwLock.writeLock();
         try {
-            // Append a new entry at the end of the list
-            Item item = new Item(cursor, heap.size());
+            Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1);
             cursors.put(cursor.getName(), item);
-
-            if (shouldTrackInHeap(cursor)) {
+            if (position != null) {
                 heap.add(item);
                 siftUp(item);
             }
+            if (cursor.isDurable()) {
+                durableCursorCount++;
+            }
         } finally {
             rwLock.unlockWrite(stamp);
         }
     }
 
-    private boolean shouldTrackInHeap(ManagedCursor cursor) {
-        return CursorType.ALL.equals(cursorType)
-                || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
-                || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
-    }
-
-    public PositionImpl getSlowestReadPositionForActiveCursors() {
-        long stamp = rwLock.readLock();
-        try {
-            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
-        } finally {
-            rwLock.unlockRead(stamp);
-        }
-    }
-
-    public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
-        long stamp = rwLock.readLock();
-        try {
-            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
-        } finally {
-            rwLock.unlockRead(stamp);
-        }
-    }
-
     public ManagedCursor get(String name) {
         long stamp = rwLock.readLock();
         try {
@@ -131,17 +107,25 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
-    public void removeCursor(String name) {
+    public boolean removeCursor(String name) {
         long stamp = rwLock.writeLock();
         try {
             Item item = cursors.remove(name);
-            if (item != null && shouldTrackInHeap(item.cursor)) {
-                // Move the item to the right end of the heap to be removed
-                Item lastItem = heap.get(heap.size() - 1);
-                swap(item, lastItem);
-                heap.remove(item.idx);
-                // Update the heap
-                siftDown(lastItem);
+            if (item != null) {
+                if (item.idx >= 0) {
+                    // Move the item to the right end of the heap to be removed
+                    Item lastItem = heap.get(heap.size() - 1);
+                    swap(item, lastItem);
+                    heap.remove(item.idx);
+                    // Update the heap
+                    siftDown(lastItem);
+                }
+                if (item.cursor.isDurable()) {
+                    durableCursorCount--;
+                }
+                return true;
+            } else {
+                return false;
             }
         } finally {
             rwLock.unlockWrite(stamp);
@@ -149,10 +133,15 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
     }
 
     /**
-     * Signal that a cursor position has been updated and that the container must re-order the cursor list.
+     * Signal that a cursor position has been updated and that the container must re-order the cursor heap
+     * tracking the slowest reader.
+     * Only those cursors are tracked and can be updated which were added to the container with the
+     * {@link #add(ManagedCursor, Position)} method that specified the initial position in the position
+     * parameter.
      *
-     * @param cursor
-     * @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
+     * @param cursor the cursor to update the position for
+     * @param newPosition the updated position for the cursor
+     * @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
      *         update).
      */
     public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
@@ -161,35 +150,33 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         long stamp = rwLock.writeLock();
         try {
             Item item = cursors.get(cursor.getName());
-            if (item == null) {
+            if (item == null || item.idx == -1) {
                 return null;
             }
 
+            PositionImpl previousSlowestConsumer = heap.get(0).position;
 
-            if (shouldTrackInHeap(item.cursor)) {
-                PositionImpl previousSlowestConsumer = heap.get(0).position;
-
+            item.position = (PositionImpl) newPosition;
+            if (heap.size() > 1) {
                 // When the cursor moves forward, we need to push it toward the
                 // bottom of the tree and push it up if a reset was done
 
-                item.position = (PositionImpl) newPosition;
                 if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
                     siftDown(item);
                 } else {
                     siftUp(item);
                 }
-
-                PositionImpl newSlowestConsumer = heap.get(0).position;
-                return Pair.of(previousSlowestConsumer, newSlowestConsumer);
             }
-            return null;
+
+            PositionImpl newSlowestConsumer = heap.get(0).position;
+            return Pair.of(previousSlowestConsumer, newSlowestConsumer);
         } finally {
             rwLock.unlockWrite(stamp);
         }
     }
 
     /**
-     * Get the slowest reader position, meaning older acknowledged position between all the cursors.
+     * Get the slowest reader position for the cursors that are ordered.
      *
      * @return the slowest reader position
      */
@@ -237,18 +224,18 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
      */
     public boolean hasDurableCursors() {
         long stamp = rwLock.tryOptimisticRead();
-        boolean isEmpty = heap.isEmpty();
+        int count = durableCursorCount;
         if (!rwLock.validate(stamp)) {
             // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                isEmpty = heap.isEmpty();
+                count = durableCursorCount;
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
 
-        return !isEmpty;
+        return count > 0;
     }
 
     @Override
@@ -291,7 +278,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
             @Override
             public void remove() {
-                throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
+                throw new IllegalArgumentException("Cannot remove ManagedCursor from container");
             }
         };
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7d1f4e0fa23..da855197df6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -635,6 +635,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         persistentMarkDeletePosition = position;
         inProgressMarkDeletePersistPosition = null;
         readPosition = ledger.getNextValidPosition(position);
+        ledger.onCursorReadPositionUpdated(this, readPosition);
         lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
         // assign cursor-ledger so, it can be deleted when new ledger will be switched
         this.cursorLedger = recoveredFromCursorLedger;
@@ -1215,6 +1216,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 ledger.getName(), newPosition, oldReadPosition, name);
                     }
                     readPosition = newPosition;
+                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
                 } finally {
                     lock.writeLock().unlock();
                 }
@@ -1708,6 +1710,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
         readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
+        ledger.onCursorReadPositionUpdated(this, readPosition);
         markDeletePosition = lastPositionCounter.getLeft();
         lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
         persistentMarkDeletePosition = null;
@@ -1782,6 +1785,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
                             ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
                 }
+                ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
                 return newReadPosition;
             } else {
                 return currentReadPosition;
@@ -2021,7 +2025,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     lock.writeLock().unlock();
                 }
 
-                ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition);
+                ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);
 
                 decrementPendingMarkDeleteCount();
 
@@ -2384,6 +2388,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
 
             readPosition = newReadPosition;
+            ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
         } finally {
             lock.writeLock().unlock();
         }
@@ -2401,6 +2406,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 newReadPosition = ledger.getNextValidPosition(markDeletePosition);
             }
             readPosition = newReadPosition;
+            ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
         } finally {
             lock.writeLock().unlock();
         }
@@ -2601,6 +2607,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (this.markDeletePosition == null
                 || ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
             this.readPosition = (PositionImpl) newReadPositionInt;
+            ledger.onCursorReadPositionUpdated(this, newReadPositionInt);
         }
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 230deb27d17..3c6af75dd08 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -163,10 +163,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
 
+    // contains all cursors, where durable cursors are ordered by mark delete position
     private final ManagedCursorContainer cursors = new ManagedCursorContainer();
+    // contains active cursors eligible for caching,
+    // ordered by read position (when cacheEvictionByMarkDeletedPosition=false) or by mark delete position
+    // (when cacheEvictionByMarkDeletedPosition=true)
     private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
-    private final ManagedCursorContainer nonDurableActiveCursors =
-            new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+
 
     // Ever increasing counter of entries added
     @VisibleForTesting
@@ -555,7 +558,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                 log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
                                         cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
                                 cursor.setActive();
-                                cursors.add(cursor);
+                                addCursor(cursor);
 
                                 if (cursorCount.decrementAndGet() == 0) {
                                     // The initialization is now completed, register the jmx mbean
@@ -589,7 +592,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                         cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
                                 cursor.setActive();
                                 synchronized (ManagedLedgerImpl.this) {
-                                    cursors.add(cursor);
+                                    addCursor(cursor);
                                     uninitializedCursors.remove(cursor.getName()).complete(cursor);
                                 }
                             }
@@ -616,6 +619,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         });
     }
 
+    private void addCursor(ManagedCursorImpl cursor) {
+        Position positionForOrdering = null;
+        if (cursor.isDurable()) {
+            positionForOrdering = cursor.getMarkDeletedPosition();
+            if (positionForOrdering == null) {
+                positionForOrdering = PositionImpl.EARLIEST;
+            }
+        }
+        cursors.add(cursor, positionForOrdering);
+    }
+
     @Override
     public String getName() {
         return name;
@@ -956,7 +970,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         : getFirstPositionAndCounter());
 
                 synchronized (ManagedLedgerImpl.this) {
-                    cursors.add(cursor);
+                    addCursor(cursor);
                     uninitializedCursors.remove(cursorName).complete(cursor);
                 }
                 callback.openCursorComplete(cursor, ctx);
@@ -984,6 +998,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return;
         } else if (!cursor.isDurable()) {
             cursors.removeCursor(consumerName);
+            deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
             return;
         }
@@ -995,17 +1010,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             public void operationComplete(Void result, Stat stat) {
                 cursor.asyncDeleteCursorLedger();
                 cursors.removeCursor(consumerName);
-
-                // Redo invalidation of entries in cache
-                PositionImpl slowestConsumerPosition = cursors.getSlowestReaderPosition();
-                if (slowestConsumerPosition != null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Doing cache invalidation up to {}", slowestConsumerPosition);
-                    }
-                    entryCache.invalidateEntries(slowestConsumerPosition);
-                } else {
-                    entryCache.clear();
-                }
+                deactivateCursorByName(consumerName);
 
                 trimConsumedLedgersInBackground();
 
@@ -1088,7 +1093,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         log.info("[{}] Opened new cursor: {}", name, cursor);
         synchronized (this) {
-            cursors.add(cursor);
+            addCursor(cursor);
         }
 
         return cursor;
@@ -2178,62 +2183,36 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return result;
     }
 
-    void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) {
-        Pair<PositionImpl, PositionImpl> pair = activeCursors.cursorUpdated(cursor, newPosition);
-        if (pair != null) {
-            entryCache.invalidateEntries(pair.getRight());
+    void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() > 0) {
+            entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
         }
     }
 
-    public PositionImpl getEvictionPosition(){
-        PositionImpl evictionPos;
-        if (config.isCacheEvictionByMarkDeletedPosition()) {
-            PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
-            evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
-        } else {
-            // Always remove all entries already read by active cursors
-            evictionPos = getEarlierReadPositionForActiveCursors();
-        }
-        return evictionPos;
-    }
-    void doCacheEviction(long maxTimestamp) {
+    // slowest reader position is earliest mark delete position when cacheEvictionByMarkDeletedPosition=true
+    // it is the earliest read position when cacheEvictionByMarkDeletedPosition=false
+    private void invalidateEntriesUpToSlowestReaderPosition() {
         if (entryCache.getSize() <= 0) {
             return;
         }
-        PositionImpl evictionPos = getEvictionPosition();
-        if (evictionPos != null) {
-            entryCache.invalidateEntries(evictionPos);
-        }
-
-        // Remove entries older than the cutoff threshold
-        entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
-    }
-
-    private PositionImpl getEarlierReadPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
-        PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
-        }
-        if (durablePosition == null) {
-            return nonDurablePosition;
+        if (!activeCursors.isEmpty()) {
+            PositionImpl evictionPos = activeCursors.getSlowestReaderPosition();
+            if (evictionPos != null) {
+                entryCache.invalidateEntries(evictionPos);
+            }
+        } else {
+            entryCache.clear();
         }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
     }
 
-    private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
+    void onCursorMarkDeletePositionUpdated(ManagedCursorImpl cursor, PositionImpl newPosition) {
+        if (config.isCacheEvictionByMarkDeletedPosition()) {
+            updateActiveCursor(cursor, newPosition);
         }
-        if (durablePosition == null) {
-            return nonDurablePosition;
+        if (!cursor.isDurable()) {
+            // non-durable cursors aren't tracked for trimming
+            return;
         }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
-    }
-
-    void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
         Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
         if (pair == null) {
             // Cursor has been removed in the meantime
@@ -2255,6 +2234,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    private void updateActiveCursor(ManagedCursorImpl cursor, Position newPosition) {
+        Pair<PositionImpl, PositionImpl> slowestPositions = activeCursors.cursorUpdated(cursor, newPosition);
+        if (slowestPositions != null
+                && !slowestPositions.getLeft().equals(slowestPositions.getRight())) {
+            invalidateEntriesUpToSlowestReaderPosition();
+        }
+    }
+
+    public void onCursorReadPositionUpdated(ManagedCursorImpl cursor, Position newReadPosition) {
+        if (!config.isCacheEvictionByMarkDeletedPosition()) {
+            updateActiveCursor(cursor, newReadPosition);
+        }
+    }
+
     PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
         if (ledgerId != null && ledgerId != position.getLedgerId()) {
@@ -2315,7 +2308,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
                 try {
                     log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
-                    updateCursor((ManagedCursorImpl) cursor, lastAckedPosition);
+                    onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition);
                 } catch (Exception e) {
                     log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
                             cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
@@ -3499,34 +3492,32 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public void activateCursor(ManagedCursor cursor) {
-        if (activeCursors.get(cursor.getName()) == null) {
-            activeCursors.add(cursor);
-        }
-        if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
-            nonDurableActiveCursors.add(cursor);
+        synchronized (activeCursors) {
+            if (activeCursors.get(cursor.getName()) == null) {
+                Position positionForOrdering = config.isCacheEvictionByMarkDeletedPosition()
+                        ? cursor.getMarkDeletedPosition()
+                        : cursor.getReadPosition();
+                if (positionForOrdering == null) {
+                    positionForOrdering = PositionImpl.EARLIEST;
+                }
+                activeCursors.add(cursor, positionForOrdering);
+            }
         }
     }
 
     public void deactivateCursor(ManagedCursor cursor) {
+        deactivateCursorByName(cursor.getName());
+    }
+
+    private void deactivateCursorByName(String cursorName) {
         synchronized (activeCursors) {
-            if (activeCursors.get(cursor.getName()) != null) {
-                activeCursors.removeCursor(cursor.getName());
-                if (!activeCursors.hasDurableCursors()) {
-                    // cleanup cache if there is no active subscription
-                    entryCache.clear();
-                } else {
-                    // if removed subscription was the slowest subscription : update cursor and let it clear cache:
-                    // till new slowest-cursor's read-position
-                    discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(),
-                            getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
-                }
-            }
-            if (!cursor.isDurable()) {
-                nonDurableActiveCursors.removeCursor(cursor.getName());
+            if (activeCursors.removeCursor(cursorName)) {
+                invalidateEntriesUpToSlowestReaderPosition();
             }
         }
     }
 
+
     public void removeWaitingCursor(ManagedCursor cursor) {
         this.waitingCursors.remove(cursor);
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index de64cb67362..918cc22978b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -104,7 +104,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
         lastMarkDeleteEntry = mdEntry;
         // it is important to advance cursor so the retention can kick in as expected.
-        ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition);
+        ledger.onCursorMarkDeletePositionUpdated(NonDurableCursorImpl.this, mdEntry.newPosition);
 
         callback.markDeleteComplete(ctx);
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 5b34dc3eb59..982acfdedd2 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -324,7 +324,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(entries.size(), 10);
 
         factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
         assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
@@ -332,11 +332,10 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
         c2.setReadPosition(pos);
-        ledger.discardEntriesFromCache(c2, pos);
         entries.forEach(Entry::release);
 
         factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
         assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 1d9315ee296..6446eca5ae0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -402,41 +402,40 @@ public class ManagedCursorContainerTest {
 
     @Test
     public void testSlowestReadPositionForActiveCursors() throws Exception {
-        ManagedCursorContainer container =
-                new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
-        assertNull(container.getSlowestReadPositionForActiveCursors());
+        ManagedCursorContainer container = new ManagedCursorContainer();
+        assertNull(container.getSlowestReaderPosition());
 
         // Add no durable cursor
         PositionImpl position = PositionImpl.get(5,5);
         ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
         doReturn(false).when(cursor1).isDurable();
         doReturn(position).when(cursor1).getReadPosition();
-        container.add(cursor1);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+        container.add(cursor1, position);
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         // Add no durable cursor
         position = PositionImpl.get(1,1);
         ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
         doReturn(false).when(cursor2).isDurable();
         doReturn(position).when(cursor2).getReadPosition();
-        container.add(cursor2);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
+        container.add(cursor2, position);
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
 
         // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
         position = PositionImpl.get(5,6);
         container.cursorUpdated(cursor2, position);
         doReturn(position).when(cursor2).getReadPosition();
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
         position = PositionImpl.get(5,8);
         doReturn(position).when(cursor1).getReadPosition();
         container.cursorUpdated(cursor1, position);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
 
         // Remove cursor, only cursor1 left, cursor1 = 5:8
         container.removeCursor(cursor2.getName());
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8));
     }
 
     @Test
@@ -445,25 +444,25 @@ public class ManagedCursorContainerTest {
         assertNull(container.getSlowestReaderPosition());
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
 
         ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(2, 0));
-        container.add(cursor3);
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         assertEquals(container.toString(), "[test1=5:5, test2=2:2, test3=2:0]");
 
         ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(4, 0));
-        container.add(cursor4);
+        container.add(cursor4, cursor4.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(3, 5));
-        container.add(cursor5);
+        container.add(cursor5, cursor5.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         cursor3.markDelete(new PositionImpl(3, 0));
@@ -488,7 +487,7 @@ public class ManagedCursorContainerTest {
         assertFalse(container.hasDurableCursors());
 
         ManagedCursor cursor6 = new MockManagedCursor(container, "test6", new PositionImpl(6, 5));
-        container.add(cursor6);
+        container.add(cursor6, cursor6.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 5));
 
         assertEquals(container.toString(), "[test6=6:5]");
@@ -499,11 +498,11 @@ public class ManagedCursorContainerTest {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
 
         cursor2.position = new PositionImpl(8, 8);
@@ -521,17 +520,17 @@ public class ManagedCursorContainerTest {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
         assertEquals(container.get("test1"), cursor1);
 
         MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
         assertEquals(container.get("test2"), cursor2);
 
         MockManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(1, 1));
-        container.add(cursor3);
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
         assertEquals(container.get("test3"), cursor3);
 
@@ -563,11 +562,11 @@ public class ManagedCursorContainerTest {
         ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(cursor1);
-        container.add(cursor2);
-        container.add(cursor3);
-        container.add(cursor4);
-        container.add(cursor5);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
+        container.add(cursor4, cursor4.getMarkDeletedPosition());
+        container.add(cursor5, cursor5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
         container.removeCursor("test2");
@@ -597,11 +596,11 @@ public class ManagedCursorContainerTest {
         MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(c1);
-        container.add(c2);
-        container.add(c3);
-        container.add(c4);
-        container.add(c5);
+        container.add(c1, c1.getMarkDeletedPosition());
+        container.add(c2, c2.getMarkDeletedPosition());
+        container.add(c3, c3.getMarkDeletedPosition());
+        container.add(c4, c4.getMarkDeletedPosition());
+        container.add(c5, c5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
 
@@ -662,11 +661,11 @@ public class ManagedCursorContainerTest {
         MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(c1);
-        container.add(c2);
-        container.add(c3);
-        container.add(c4);
-        container.add(c5);
+        container.add(c1, c1.getMarkDeletedPosition());
+        container.add(c2, c2.getMarkDeletedPosition());
+        container.add(c3, c3.getMarkDeletedPosition());
+        container.add(c4, c4.getMarkDeletedPosition());
+        container.add(c5, c5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7eee82be9dd..7adf5f9abb4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -106,6 +106,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoun
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -296,265 +297,120 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testDoCacheEviction() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setCacheEvictionByMarkDeletedPosition(true);
-        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
-                .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                try {
-                                    @SuppressWarnings("unchecked")
-                                    Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                    ManagedLedger ledger = pair.getLeft();
-                                    ManagedCursor cursor = pair.getRight();
-                                    if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                        result.complete(false);
-                                        return;
-                                    }
-
-                                    ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
-                                    ledgerImpl.getActiveCursors().removeCursor(cursor.getName());
-                                    assertNull(ledgerImpl.getEvictionPosition());
-                                    assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length);
-                                    ledgerImpl.doCacheEviction(System.nanoTime());
-                                    assertTrue(ledgerImpl.getCacheSize() <= 0);
-                                    result.complete(true);
-                                } catch (Throwable e) {
-                                    result.completeExceptionally(e);
-                                }
-                            }
-
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
-
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
-                }, ledger);
-            }
+    public void shouldKeepEntriesInCacheByEarliestReadPosition() throws ManagedLedgerException, InterruptedException {
+        // This test case reproduces issue #16054
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
-        assertTrue(result.get());
-
-        log.info("Test completed");
-    }
-
-    @Test
-    public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setCacheEvictionByMarkDeletedPosition(true);
         factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
                 .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                @SuppressWarnings("unchecked")
-                                Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                ManagedLedger ledger = pair.getLeft();
-                                ManagedCursor cursor = pair.getRight();
-                                if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                    result.complete(false);
-                                    return;
-                                }
-                                cursor.asyncReadEntries(1, new ReadEntriesCallback() {
-                                    @Override
-                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                                        ManagedCursor cursor = (ManagedCursor) ctx;
-                                        assertEquals(entries.size(), 1);
-                                        Entry entry = entries.get(0);
-                                        final Position position = entry.getPosition();
-                                        if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
-                                            result.complete(false);
-                                            return;
-                                        }
-                                        ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                        if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                            result.complete(false);
-                                            return;
-                                        }
 
-                                        log.debug("Mark-Deleting to position {}", position);
-                                        cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
-                                            @Override
-                                            public void markDeleteComplete(Object ctx) {
-                                                log.debug("Mark delete complete");
-                                                ManagedCursor cursor = (ManagedCursor) ctx;
-                                                if (cursor.hasMoreEntries()) {
-                                                    result.complete(false);
-                                                    return;
-                                                }
-                                                ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                        System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                                result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
-                                            }
+        // GIVEN an opened ledger with 10 opened cursors
 
-                                            @Override
-                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
-                                                result.completeExceptionally(exception);
-                                            }
+        ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestReadPosition",
+                config);
+        List<ManagedCursor> cursors = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = ledger.openCursor("c" + i);
+            cursors.add(cursor);
+        }
 
-                                        }, cursor);
-                                    }
+        ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+        int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
 
-                                    @Override
-                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-                                        result.completeExceptionally(exception);
-                                    }
-                                }, cursor, PositionImpl.LATEST);
-                            }
+        // AND 100 added entries
 
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
+        for (int i = 0; i < 100; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
 
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
+        int insertedEntriesCount =
+                (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+        // EXPECT that 100 entries should have been inserted to the cache
+        assertEquals(insertedEntriesCount, 100);
 
-                }, ledger);
-            }
+        int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
+        // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = cursors.get(i);
+            // read entries farther of the  earliest cursor
+            List<Entry> entries = cursor.readEntries(20 - i);
+            // mark delete the least for the earliest cursor
+            cursor.markDelete(entries.get(i).getPosition());
+            entries.forEach(Entry::release);
+        }
 
-        assertTrue(result.get());
+        // THEN it is expected that the cache evicts entries to the earliest read position
+        Thread.sleep(2 * factory.getConfig().getCacheEvictionIntervalMs());
+        int evictedEntriesCount =
+                (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+        assertEquals(evictedEntriesCount, 11,
+                "It is expected that the cache evicts entries to the earliest read position");
 
-        log.info("Test completed");
+        ledger.close();
     }
 
     @Test
-    public void testCacheEvictionByReadPosition() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
+    public void shouldKeepEntriesInCacheByEarliestMarkDeletePosition() throws ManagedLedgerException, InterruptedException {
+        // This test case reproduces issue #16054
+
         ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
         factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
                 .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                @SuppressWarnings("unchecked")
-                                Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                ManagedLedger ledger = pair.getLeft();
-                                ManagedCursor cursor = pair.getRight();
-                                if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                    result.complete(false);
-                                    return;
-                                }
 
-                                cursor.asyncReadEntries(1, new ReadEntriesCallback() {
-                                    @Override
-                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                                        ManagedCursor cursor = (ManagedCursor) ctx;
-                                        assertEquals(entries.size(), 1);
-                                        Entry entry = entries.get(0);
-                                        final Position position = entry.getPosition();
-                                        if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
-                                            result.complete(false);
-                                            return;
-                                        }
-                                        ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                        if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) {
-                                            result.complete(false);
-                                            return;
-                                        }
+        // GIVEN an opened ledger with 10 opened cursors
 
-                                        log.debug("Mark-Deleting to position {}", position);
-                                        cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
-                                            @Override
-                                            public void markDeleteComplete(Object ctx) {
-                                                log.debug("Mark delete complete");
-                                                ManagedCursor cursor = (ManagedCursor) ctx;
-                                                if (cursor.hasMoreEntries()) {
-                                                    result.complete(false);
-                                                    return;
-                                                }
-                                                result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
-                                            }
+        ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestMarkDeletePosition",
+                config);
+        List<ManagedCursor> cursors = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = ledger.openCursor("c" + i);
+            cursors.add(cursor);
+        }
 
-                                            @Override
-                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
-                                                result.completeExceptionally(exception);
-                                            }
+        ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+        int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
 
-                                        }, cursor);
-                                    }
+        // AND 100 added entries
 
-                                    @Override
-                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-                                        result.completeExceptionally(exception);
-                                    }
-                                }, cursor, PositionImpl.LATEST);
-                            }
+        for (int i = 0; i < 100; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
 
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
+        int insertedEntriesCount =
+                (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+        // EXPECT that 100 entries should have been inserted to the cache
+        assertEquals(insertedEntriesCount, 100);
 
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
+        int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
 
-                }, ledger);
-            }
+        // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+        Position lastMarkDeletePos = null;
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = cursors.get(i);
+            // read 50 (+ index) entries for each cursor
+            List<Entry> entries = cursor.readEntries(50 + (5 * i));
+            // mark delete the most for the earliest cursor
+            lastMarkDeletePos = entries.get(20 - i).getPosition();
+            cursor.markDelete(lastMarkDeletePos);
+            entries.forEach(Entry::release);
+        }
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
+        Thread.sleep(1000 + 2 * factory.getConfig().getCacheEvictionIntervalMs());
 
-        assertTrue(result.get());
+        ManagedCursorContainer activeCursors = (ManagedCursorContainer) ledger.getActiveCursors();
+        assertEquals(activeCursors.getSlowestReaderPosition(), lastMarkDeletePos);
 
-        log.info("Test completed");
+        // THEN it is expected that the cache evicts entries to the earliest read position
+        int evictedEntriesCount =
+                (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+        assertEquals(evictedEntriesCount, 11,
+                "It is expected that the cache evicts entries to the earliest read position");
+
+        ledger.close();
     }
 
     @Test(timeOut = 20000)
@@ -1783,10 +1639,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     @Test
     public void invalidateConsumedEntriesFromCache() throws Exception {
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateConsumedEntriesFromCache",
+                        config);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
         EntryCache entryCache = ledger.entryCache;
+        entryCache.clear();
 
         ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
         ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
@@ -1799,28 +1659,78 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
+
         c2.setReadPosition(p3);
-        ledger.discardEntriesFromCache(c2, p2);
 
         assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p2);
-        ledger.discardEntriesFromCache(c1, p2);
         assertEquals(entryCache.getSize(), 7 * 3);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p3);
-        ledger.discardEntriesFromCache(c1, p3);
-        assertEquals(entryCache.getSize(), 7 * 3);
+        assertEquals(entryCache.getSize(), 7 * 2);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         ledger.deactivateCursor(c1);
-        assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4
+        assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c2.setReadPosition(p4);
+        assertEquals(entryCache.getSize(), 7);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        ledger.deactivateCursor(c2);
+        assertEquals(entryCache.getSize(), 0);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+    }
+
+    @Test
+    public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition",
+                        config);
+
+        EntryCacheManager cacheManager = factory.getEntryCacheManager();
+        EntryCache entryCache = ledger.entryCache;
+        entryCache.clear();
+
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
+
+        PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
+        PositionImpl p2 = (PositionImpl) ledger.addEntry("entry-2".getBytes());
+        PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes());
+        PositionImpl p4 = (PositionImpl) ledger.addEntry("entry-4".getBytes());
+
+        assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
+
         c2.setReadPosition(p4);
-        ledger.discardEntriesFromCache(c2, p4);
+        c2.markDelete(p3);
+
+        assertEquals(entryCache.getSize(), 7 * 4);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c1.setReadPosition(p3);
+        c1.markDelete(p2);
+        assertEquals(entryCache.getSize(), 7 * 3);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c1.setReadPosition(p4);
+        c1.markDelete(p3);
+        assertEquals(entryCache.getSize(), 7 * 2);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        ledger.deactivateCursor(c1);
+        assertEquals(entryCache.getSize(), 7 * 2);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c2.markDelete(p4);
         assertEquals(entryCache.getSize(), 7);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
@@ -2104,10 +2014,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         ledger.addEntry("data".getBytes());
         ledger.addEntry("data".getBytes());
-        
+
         Awaitility.await().untilAsserted(() -> {
             assertEquals(ledger.getLedgersInfoAsList().size(), 2);
-        });   
+        });
     }
 
     @Test
@@ -2729,8 +2639,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (3) Validate: cache should remove all entries read by both active cursors
-        log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        log.info("expected, found : {}, {}", 5 * (totalInsertedEntries - readEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
 
         final int remainingEntries = totalInsertedEntries - readEntries;
         entries1 = cursor1.readEntries(remainingEntries);
@@ -2744,7 +2654,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these
         // entries
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
 
         ledger.deactivateCursor(cursor1);
         ledger.deactivateCursor(cursor2);
@@ -2769,7 +2679,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (1) Validate: cache not stores entries as no active cursor
-        assertEquals(0, entryCache.getSize());
+        assertEquals(entryCache.getSize(), 0);
 
         // Open Cursor also adds cursor into activeCursor-container
         ManagedCursor cursor1 = ledger.openCursor("c1");
@@ -2782,7 +2692,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (2) Validate: cache stores entries as active cursor has not read message
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * totalInsertedEntries);
 
         // read 20 entries
         List<Entry> entries1 = cursor1.readEntries(totalInsertedEntries);
@@ -2793,7 +2703,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // (3) Validate: cache discards all entries after all cursors are deactivated
         ledger.deactivateCursor(cursor1);
-        assertEquals(0, entryCache.getSize());
+        assertEquals(entryCache.getSize(), 0);
 
         ledger.close();
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 0fd8902f825..1e960c32bf6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -79,6 +80,9 @@ public abstract class MockedBookKeeperTestCase {
             throw e;
         }
 
+        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
+        // increase default cache eviction interval so that caching could be tested with less flakyness
+        managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
         factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
 
         setUpTestCase();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 7a3eabdb318..307244a6447 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -581,7 +581,7 @@ public class TransactionTest extends TransactionTestBase {
         field.setAccessible(true);
         ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
         managedCursors.removeCursor("transaction-buffer-sub");
-        managedCursors.add(managedCursor);
+        managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
 
         doAnswer(invocation -> {
             AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
@@ -600,7 +600,7 @@ public class TransactionTest extends TransactionTestBase {
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
 
-        managedCursors.add(managedCursor);
+        managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
         TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer3.getStats(false).state, "Ready"));


[pulsar] 05/05: [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05efd10278eb1a3df3f24b9a2add390283196b09
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Oct 30 22:48:43 2022 -0700

    [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  60 +++++-----
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 123 ++++++++++++++++++++-
 2 files changed, 153 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index da855197df6..4f1a376771c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1153,29 +1153,33 @@ public class ManagedCursorImpl implements ManagedCursor {
         return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
     }
 
-    protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
-        if (position.equals(PositionImpl.EARLIEST)) {
-            position = ledger.getFirstPosition();
-        } else if (position.equals(PositionImpl.LATEST)) {
-            position = ledger.getLastPosition().getNext();
+    protected void internalResetCursor(PositionImpl proposedReadPosition,
+                                       AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
+        final PositionImpl newReadPosition;
+        if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
+            newReadPosition = ledger.getFirstPosition();
+        } else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
+            newReadPosition = ledger.getLastPosition().getNext();
+        } else {
+            newReadPosition = proposedReadPosition;
         }
 
-        log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
+        log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name);
 
         synchronized (pendingMarkDeleteOps) {
             if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
-                log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
-                        ledger.getName(), position, name);
+                log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
+                        ledger.getName(), newReadPosition, name);
                 resetCursorCallback.resetFailed(
                         new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
-                        position);
+                        newReadPosition);
                 return;
             }
         }
 
         final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
 
-        final PositionImpl newPosition = position;
+        final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);
 
         VoidCallback finalCallback = new VoidCallback() {
             @Override
@@ -1184,8 +1188,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // modify mark delete and read position since we are able to persist new position for cursor
                 lock.writeLock().lock();
                 try {
-                    PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
-
                     if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
                         MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
                                 Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
@@ -1200,34 +1202,34 @@ public class ManagedCursorImpl implements ManagedCursor {
                     if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
                         batchDeletedIndexes.clear();
-                        long[] resetWords = newPosition.ackSet;
+                        long[] resetWords = newReadPosition.ackSet;
                         if (resetWords != null) {
                             BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
-                            batchDeletedIndexes.put(newPosition, ackSet);
+                            batchDeletedIndexes.put(newReadPosition, ackSet);
                         }
                     }
 
                     PositionImpl oldReadPosition = readPosition;
-                    if (oldReadPosition.compareTo(newPosition) >= 0) {
-                        log.info("[{}] reset position to {} before current read position {} on cursor {}",
-                                ledger.getName(), newPosition, oldReadPosition, name);
+                    if (oldReadPosition.compareTo(newReadPosition) >= 0) {
+                        log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
+                                ledger.getName(), newReadPosition, oldReadPosition, name);
                     } else {
-                        log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
-                                ledger.getName(), newPosition, oldReadPosition, name);
+                        log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on "
+                                        + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
                     }
-                    readPosition = newPosition;
-                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
+                    readPosition = newReadPosition;
+                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
                 } finally {
                     lock.writeLock().unlock();
                 }
                 synchronized (pendingMarkDeleteOps) {
                     pendingMarkDeleteOps.clear();
                     if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
-                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
+                        log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+                                ledger.getName(), newReadPosition, name);
                     }
                 }
-                callback.resetComplete(newPosition);
+                callback.resetComplete(newReadPosition);
                 updateLastActive();
             }
 
@@ -1235,20 +1237,20 @@ public class ManagedCursorImpl implements ManagedCursor {
             public void operationFailed(ManagedLedgerException exception) {
                 synchronized (pendingMarkDeleteOps) {
                     if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
-                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
+                        log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+                                ledger.getName(), newReadPosition, name);
                     }
                 }
                 callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
-                        "unable to persist position for cursor reset " + newPosition.toString()), newPosition);
+                        "unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
             }
 
         };
 
         persistentMarkDeletePosition = null;
         inProgressMarkDeletePersistPosition = null;
-        lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
-        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+        lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
+        internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
                 new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 0e66e76d5c3..1a8feea1e0d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
@@ -676,7 +677,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         }
         assertTrue(moveStatus.get());
         PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
-        assertEquals(earliestPos, cursor.getReadPosition());
+        assertEquals(cursor.getReadPosition(), earliestPos);
         moveStatus.set(false);
 
         // reset to one after last entry in a ledger should point to the first entry in the next ledger
@@ -3283,6 +3284,126 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         });
     }
 
+    @Test(timeOut = 20000)
+    public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception {
+        ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry");
+        ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+        // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+        final Position readPositionBeforeRecover = c.getReadPosition();
+
+        // Trigger the lastConfirmedEntry to move forward
+        ml.addEntry(new byte[1]);
+
+        ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+                .setCursorsLedgerId(c.getCursorLedger())
+                .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+                .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+                .setLastActive(0L)
+                .build();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        c.recoverFromLedger(info, new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                latch.countDown();
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                failed.set(true);
+                latch.countDown();
+            }
+        });
+
+        latch.await();
+        if (failed.get()) {
+            fail("Cursor recovery should not fail");
+        }
+        assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+        assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+        assertEquals(c.getNumberOfEntries(), 1L);
+    }
+
+    @Test(timeOut = 20000)
+    public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception {
+        ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries");
+        ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+        // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        // Trigger the lastConfirmedEntry to move forward
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
+        assertEquals(c.getReadPosition().getEntryId(), 4);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);
+
+        // Publish messages to move the lastConfirmedEntry field forward
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+
+        final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+        final Position readPositionBeforeRecover = c.getReadPosition();
+
+        ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+                .setCursorsLedgerId(c.getCursorLedger())
+                .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+                .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+                .setLastActive(0L)
+                .build();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        c.recoverFromLedger(info, new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                latch.countDown();
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                failed.set(true);
+                latch.countDown();
+            }
+        });
+
+        latch.await();
+        if (failed.get()) {
+            fail("Cursor recovery should not fail");
+        }
+        assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+        assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+        assertEquals(c.getNumberOfEntries(), 2L);
+    }
     @Test
     void testAlwaysInactive() throws Exception {
         ManagedLedger ml = factory.open("testAlwaysInactive");


[pulsar] 03/05: [improve][ml] Remove the redundant judgment logic of ManagedCursorImpl (#18205)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2bf2f5b757c272922fbf2bd05749331cc8c50d03
Author: HuangZeGui <hz...@126.com>
AuthorDate: Thu Oct 27 11:57:20 2022 +0800

    [improve][ml] Remove the redundant judgment logic of ManagedCursorImpl (#18205)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java    | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3c3bad218d4..7d1f4e0fa23 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -536,7 +536,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
                     recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
                 }
-                if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
+                if (config.isDeletionAtBatchIndexLevelEnabled()
                     && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
                     recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
                 }
@@ -1196,7 +1196,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
                             ? getProperties() : Collections.emptyMap(), null, null);
                     individualDeletedMessages.clear();
-                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
                         batchDeletedIndexes.clear();
                         long[] resetWords = newPosition.ackSet;
@@ -1826,7 +1826,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         PositionImpl newPosition = (PositionImpl) position;
 
-        if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+        if (config.isDeletionAtBatchIndexLevelEnabled()) {
             if (newPosition.ackSet != null) {
                 AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
                 BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
@@ -2009,7 +2009,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 try {
                     individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
                             mdEntry.newPosition.getEntryId());
-                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
                                 false, PositionImpl.get(mdEntry.newPosition.getLedgerId(),
                                 mdEntry.newPosition.getEntryId()), true);
@@ -2138,7 +2138,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
                     || position.compareTo(markDeletePosition) <= 0) {
-                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
                             bitSetRecyclable.recycle();
@@ -2150,7 +2150,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     continue;
                 }
                 if (position.ackSet == null) {
-                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
                             bitSetRecyclable.recycle();
@@ -2167,7 +2167,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
                             individualDeletedMessages);
                     }
-                } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                } else if (config.isDeletionAtBatchIndexLevelEnabled()) {
                     BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
                     BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
                     if (givenBitSet != bitSet) {
@@ -2807,8 +2807,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
         lock.readLock().lock();
         try {
-            if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null
-                    || batchDeletedIndexes.isEmpty()) {
+            if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
                 return Collections.emptyList();
             }
             MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
@@ -3260,7 +3259,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
-        if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+        if (config.isDeletionAtBatchIndexLevelEnabled()) {
             BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
             return bitSet == null ? null : bitSet.toLongArray();
         } else {