You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/08/31 03:25:58 UTC

[pulsar] branch master updated: [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 856ef155ba8 [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)
856ef155ba8 is described below

commit 856ef155ba8ba47f651e68fb09fc63fa08cf4e9d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Aug 31 06:25:50 2022 +0300

    [fix][broker] Fix broker cache eviction of entries read by active cursors (#17273)
    
    * [fix][broker] Fix broken build caused by conflict between #17195 and #16605
    
    - #17195 changed the method signature that #16605 depended upon
    
    * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false
    
    Fixes #16054
    
    - calculate the sorted list of when a read position gets updated
    - this resolves #9958 in a proper way
      - #12045 broke the caching solution as explained in #16054
    - remove invalid tests
    - fix tests
    - add more tests to handle corner cases
    
    * Address review comment
    
    * Handle durable & non-durable in the correct way
    
    * Fix cache tests since now entries get evicted reactively
    
    * Address review comment about method names
    
    * Change signature for add method so that position must be passed
    
    - this is more consistent with cursorUpdated method where the position is passed
    
    * Update javadoc for ManagedCursorContainer
    
    * Address review comment
    
    * Simplify ManagedCursorContainer
    
    * Clarify javadoc
    
    * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally
    
    * Prevent race in updating activeCursors
---
 .../mledger/impl/ManagedCursorContainer.java       | 139 ++++----
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   9 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 155 ++++----
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +-
 .../mledger/impl/EntryCacheManagerTest.java        |   5 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |  71 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 396 ++++++++-------------
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |   4 +
 .../pulsar/broker/transaction/TransactionTest.java |   4 +-
 9 files changed, 341 insertions(+), 444 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index 5a96ee08de9..b5a5be733a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -30,17 +30,16 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
- * Contains all the cursors for a ManagedLedger.
+ * Contains cursors for a ManagedLedger.
  *
  * <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
  *
- * <p/>This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
- * in the linked-list. The list is a sorted double linked-list of cursors.
+ * <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
+ * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
+ * in a single array. More details about heap implementations:
+ * https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
  *
- * <p/>When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
- *
- * <p/>To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
- * care about ledgers to be deleted.
+ * <p/>The heap is updated and kept sorted when a cursor is updated.
  *
  */
 public class ManagedCursorContainer implements Iterable<ManagedCursor> {
@@ -50,30 +49,18 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         PositionImpl position;
         int idx;
 
-        Item(ManagedCursor cursor, int idx) {
+        Item(ManagedCursor cursor, PositionImpl position, int idx) {
             this.cursor = cursor;
-            this.position = (PositionImpl) cursor.getMarkDeletedPosition();
+            this.position = position;
             this.idx = idx;
         }
     }
 
-    public enum CursorType {
-        DurableCursor,
-        NonDurableCursor,
-        ALL
-    }
-
     public ManagedCursorContainer() {
-        cursorType = CursorType.DurableCursor;
-    }
 
-    public ManagedCursorContainer(CursorType cursorType) {
-        this.cursorType = cursorType;
     }
 
-    private final CursorType cursorType;
-
-    // Used to keep track of slowest cursor. Contains all of all active cursors.
+    // Used to keep track of slowest cursor.
     private final ArrayList<Item> heap = new ArrayList();
 
     // Maps a cursor to its position in the heap
@@ -81,46 +68,35 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
     private final StampedLock rwLock = new StampedLock();
 
-    public void add(ManagedCursor cursor) {
+    private int durableCursorCount;
+
+
+    /**
+     * Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when
+     * a position is passed as the second argument. It is expected that the position is updated with
+     * {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes.
+     *
+     * @param cursor cursor to add
+     * @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
+     *                 tracked for the slowest reader.
+     */
+    public void add(ManagedCursor cursor, Position position) {
         long stamp = rwLock.writeLock();
         try {
-            // Append a new entry at the end of the list
-            Item item = new Item(cursor, heap.size());
+            Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1);
             cursors.put(cursor.getName(), item);
-
-            if (shouldTrackInHeap(cursor)) {
+            if (position != null) {
                 heap.add(item);
                 siftUp(item);
             }
+            if (cursor.isDurable()) {
+                durableCursorCount++;
+            }
         } finally {
             rwLock.unlockWrite(stamp);
         }
     }
 
-    private boolean shouldTrackInHeap(ManagedCursor cursor) {
-        return CursorType.ALL.equals(cursorType)
-                || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
-                || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
-    }
-
-    public PositionImpl getSlowestReadPositionForActiveCursors() {
-        long stamp = rwLock.readLock();
-        try {
-            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
-        } finally {
-            rwLock.unlockRead(stamp);
-        }
-    }
-
-    public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
-        long stamp = rwLock.readLock();
-        try {
-            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
-        } finally {
-            rwLock.unlockRead(stamp);
-        }
-    }
-
     public ManagedCursor get(String name) {
         long stamp = rwLock.readLock();
         try {
@@ -131,17 +107,25 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
-    public void removeCursor(String name) {
+    public boolean removeCursor(String name) {
         long stamp = rwLock.writeLock();
         try {
             Item item = cursors.remove(name);
-            if (item != null && shouldTrackInHeap(item.cursor)) {
-                // Move the item to the right end of the heap to be removed
-                Item lastItem = heap.get(heap.size() - 1);
-                swap(item, lastItem);
-                heap.remove(item.idx);
-                // Update the heap
-                siftDown(lastItem);
+            if (item != null) {
+                if (item.idx >= 0) {
+                    // Move the item to the right end of the heap to be removed
+                    Item lastItem = heap.get(heap.size() - 1);
+                    swap(item, lastItem);
+                    heap.remove(item.idx);
+                    // Update the heap
+                    siftDown(lastItem);
+                }
+                if (item.cursor.isDurable()) {
+                    durableCursorCount--;
+                }
+                return true;
+            } else {
+                return false;
             }
         } finally {
             rwLock.unlockWrite(stamp);
@@ -149,10 +133,15 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
     }
 
     /**
-     * Signal that a cursor position has been updated and that the container must re-order the cursor list.
+     * Signal that a cursor position has been updated and that the container must re-order the cursor heap
+     * tracking the slowest reader.
+     * Only those cursors are tracked and can be updated which were added to the container with the
+     * {@link #add(ManagedCursor, Position)} method that specified the initial position in the position
+     * parameter.
      *
-     * @param cursor
-     * @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
+     * @param cursor the cursor to update the position for
+     * @param newPosition the updated position for the cursor
+     * @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
      *         update).
      */
     public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
@@ -161,35 +150,33 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         long stamp = rwLock.writeLock();
         try {
             Item item = cursors.get(cursor.getName());
-            if (item == null) {
+            if (item == null || item.idx == -1) {
                 return null;
             }
 
+            PositionImpl previousSlowestConsumer = heap.get(0).position;
 
-            if (shouldTrackInHeap(item.cursor)) {
-                PositionImpl previousSlowestConsumer = heap.get(0).position;
-
+            item.position = (PositionImpl) newPosition;
+            if (heap.size() > 1) {
                 // When the cursor moves forward, we need to push it toward the
                 // bottom of the tree and push it up if a reset was done
 
-                item.position = (PositionImpl) newPosition;
                 if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
                     siftDown(item);
                 } else {
                     siftUp(item);
                 }
-
-                PositionImpl newSlowestConsumer = heap.get(0).position;
-                return Pair.of(previousSlowestConsumer, newSlowestConsumer);
             }
-            return null;
+
+            PositionImpl newSlowestConsumer = heap.get(0).position;
+            return Pair.of(previousSlowestConsumer, newSlowestConsumer);
         } finally {
             rwLock.unlockWrite(stamp);
         }
     }
 
     /**
-     * Get the slowest reader position, meaning older acknowledged position between all the cursors.
+     * Get the slowest reader position for the cursors that are ordered.
      *
      * @return the slowest reader position
      */
@@ -237,18 +224,18 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
      */
     public boolean hasDurableCursors() {
         long stamp = rwLock.tryOptimisticRead();
-        boolean isEmpty = heap.isEmpty();
+        int count = durableCursorCount;
         if (!rwLock.validate(stamp)) {
             // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                isEmpty = heap.isEmpty();
+                count = durableCursorCount;
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
 
-        return !isEmpty;
+        return count > 0;
     }
 
     @Override
@@ -291,7 +278,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
             @Override
             public void remove() {
-                throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
+                throw new IllegalArgumentException("Cannot remove ManagedCursor from container");
             }
         };
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index de85ac92ee8..df1c805a6d5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -635,6 +635,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         persistentMarkDeletePosition = position;
         inProgressMarkDeletePersistPosition = null;
         readPosition = ledger.getNextValidPosition(position);
+        ledger.onCursorReadPositionUpdated(this, readPosition);
         lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
         // assign cursor-ledger so, it can be deleted when new ledger will be switched
         this.cursorLedger = recoveredFromCursorLedger;
@@ -1215,6 +1216,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 ledger.getName(), newPosition, oldReadPosition, name);
                     }
                     readPosition = newPosition;
+                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
                 } finally {
                     lock.writeLock().unlock();
                 }
@@ -1701,6 +1703,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
         readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
+        ledger.onCursorReadPositionUpdated(this, readPosition);
         markDeletePosition = lastPositionCounter.getLeft();
         lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
         persistentMarkDeletePosition = null;
@@ -1775,6 +1778,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
                             ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
                 }
+                ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
                 return newReadPosition;
             } else {
                 return currentReadPosition;
@@ -1994,7 +1998,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     lock.writeLock().unlock();
                 }
 
-                ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition);
+                ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);
 
                 decrementPendingMarkDeleteCount();
 
@@ -2356,6 +2360,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
 
             readPosition = newReadPosition;
+            ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
         } finally {
             lock.writeLock().unlock();
         }
@@ -2373,6 +2378,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 newReadPosition = ledger.getNextValidPosition(markDeletePosition);
             }
             readPosition = newReadPosition;
+            ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
         } finally {
             lock.writeLock().unlock();
         }
@@ -2573,6 +2579,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (this.markDeletePosition == null
                 || ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
             this.readPosition = (PositionImpl) newReadPositionInt;
+            ledger.onCursorReadPositionUpdated(this, newReadPositionInt);
         }
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0ca576fe006..84a7377841c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -164,10 +164,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
 
+    // contains all cursors, where durable cursors are ordered by mark delete position
     private final ManagedCursorContainer cursors = new ManagedCursorContainer();
+    // contains active cursors eligible for caching,
+    // ordered by read position (when cacheEvictionByMarkDeletedPosition=false) or by mark delete position
+    // (when cacheEvictionByMarkDeletedPosition=true)
     private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
-    private final ManagedCursorContainer nonDurableActiveCursors =
-            new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+
 
     // Ever increasing counter of entries added
     @VisibleForTesting
@@ -558,7 +561,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                 log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
                                         cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
                                 cursor.setActive();
-                                cursors.add(cursor);
+                                addCursor(cursor);
 
                                 if (cursorCount.decrementAndGet() == 0) {
                                     // The initialization is now completed, register the jmx mbean
@@ -592,7 +595,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                         cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
                                 cursor.setActive();
                                 synchronized (ManagedLedgerImpl.this) {
-                                    cursors.add(cursor);
+                                    addCursor(cursor);
                                     uninitializedCursors.remove(cursor.getName()).complete(cursor);
                                 }
                             }
@@ -619,6 +622,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         });
     }
 
+    private void addCursor(ManagedCursorImpl cursor) {
+        Position positionForOrdering = null;
+        if (cursor.isDurable()) {
+            positionForOrdering = cursor.getMarkDeletedPosition();
+            if (positionForOrdering == null) {
+                positionForOrdering = PositionImpl.EARLIEST;
+            }
+        }
+        cursors.add(cursor, positionForOrdering);
+    }
+
     @Override
     public String getName() {
         return name;
@@ -960,7 +974,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         : getFirstPositionAndCounter());
 
                 synchronized (ManagedLedgerImpl.this) {
-                    cursors.add(cursor);
+                    addCursor(cursor);
                     uninitializedCursors.remove(cursorName).complete(cursor);
                 }
                 callback.openCursorComplete(cursor, ctx);
@@ -988,6 +1002,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return;
         } else if (!cursor.isDurable()) {
             cursors.removeCursor(consumerName);
+            deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
             return;
         }
@@ -999,17 +1014,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             public void operationComplete(Void result, Stat stat) {
                 cursor.asyncDeleteCursorLedger();
                 cursors.removeCursor(consumerName);
-
-                // Redo invalidation of entries in cache
-                PositionImpl slowestConsumerPosition = cursors.getSlowestReaderPosition();
-                if (slowestConsumerPosition != null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Doing cache invalidation up to {}", slowestConsumerPosition);
-                    }
-                    entryCache.invalidateEntries(slowestConsumerPosition);
-                } else {
-                    entryCache.clear();
-                }
+                deactivateCursorByName(consumerName);
 
                 trimConsumedLedgersInBackground();
 
@@ -1092,7 +1097,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         log.info("[{}] Opened new cursor: {}", name, cursor);
         synchronized (this) {
-            cursors.add(cursor);
+            addCursor(cursor);
         }
 
         return cursor;
@@ -2187,62 +2192,36 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return result;
     }
 
-    void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) {
-        Pair<PositionImpl, PositionImpl> pair = activeCursors.cursorUpdated(cursor, newPosition);
-        if (pair != null) {
-            entryCache.invalidateEntries(pair.getRight());
+    void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() > 0) {
+            entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
         }
     }
 
-    public PositionImpl getEvictionPosition(){
-        PositionImpl evictionPos;
-        if (config.isCacheEvictionByMarkDeletedPosition()) {
-            PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
-            evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
-        } else {
-            // Always remove all entries already read by active cursors
-            evictionPos = getEarlierReadPositionForActiveCursors();
-        }
-        return evictionPos;
-    }
-    void doCacheEviction(long maxTimestamp) {
+    // slowest reader position is earliest mark delete position when cacheEvictionByMarkDeletedPosition=true
+    // it is the earliest read position when cacheEvictionByMarkDeletedPosition=false
+    private void invalidateEntriesUpToSlowestReaderPosition() {
         if (entryCache.getSize() <= 0) {
             return;
         }
-        PositionImpl evictionPos = getEvictionPosition();
-        if (evictionPos != null) {
-            entryCache.invalidateEntries(evictionPos);
-        }
-
-        // Remove entries older than the cutoff threshold
-        entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
-    }
-
-    private PositionImpl getEarlierReadPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
-        PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
-        }
-        if (durablePosition == null) {
-            return nonDurablePosition;
+        if (!activeCursors.isEmpty()) {
+            PositionImpl evictionPos = activeCursors.getSlowestReaderPosition();
+            if (evictionPos != null) {
+                entryCache.invalidateEntries(evictionPos);
+            }
+        } else {
+            entryCache.clear();
         }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
     }
 
-    private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
+    void onCursorMarkDeletePositionUpdated(ManagedCursorImpl cursor, PositionImpl newPosition) {
+        if (config.isCacheEvictionByMarkDeletedPosition()) {
+            updateActiveCursor(cursor, newPosition);
         }
-        if (durablePosition == null) {
-            return nonDurablePosition;
+        if (!cursor.isDurable()) {
+            // non-durable cursors aren't tracked for trimming
+            return;
         }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
-    }
-
-    void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
         Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
         if (pair == null) {
             // Cursor has been removed in the meantime
@@ -2264,6 +2243,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    private void updateActiveCursor(ManagedCursorImpl cursor, Position newPosition) {
+        Pair<PositionImpl, PositionImpl> slowestPositions = activeCursors.cursorUpdated(cursor, newPosition);
+        if (slowestPositions != null
+                && !slowestPositions.getLeft().equals(slowestPositions.getRight())) {
+            invalidateEntriesUpToSlowestReaderPosition();
+        }
+    }
+
+    public void onCursorReadPositionUpdated(ManagedCursorImpl cursor, Position newReadPosition) {
+        if (!config.isCacheEvictionByMarkDeletedPosition()) {
+            updateActiveCursor(cursor, newReadPosition);
+        }
+    }
+
     PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
         if (ledgerId != null && ledgerId != position.getLedgerId()) {
@@ -2324,7 +2317,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
                 try {
                     log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
-                    updateCursor((ManagedCursorImpl) cursor, lastAckedPosition);
+                    onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition);
                 } catch (Exception e) {
                     log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
                             cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
@@ -3515,34 +3508,32 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public void activateCursor(ManagedCursor cursor) {
-        if (activeCursors.get(cursor.getName()) == null) {
-            activeCursors.add(cursor);
-        }
-        if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
-            nonDurableActiveCursors.add(cursor);
+        synchronized (activeCursors) {
+            if (activeCursors.get(cursor.getName()) == null) {
+                Position positionForOrdering = config.isCacheEvictionByMarkDeletedPosition()
+                        ? cursor.getMarkDeletedPosition()
+                        : cursor.getReadPosition();
+                if (positionForOrdering == null) {
+                    positionForOrdering = PositionImpl.EARLIEST;
+                }
+                activeCursors.add(cursor, positionForOrdering);
+            }
         }
     }
 
     public void deactivateCursor(ManagedCursor cursor) {
+        deactivateCursorByName(cursor.getName());
+    }
+
+    private void deactivateCursorByName(String cursorName) {
         synchronized (activeCursors) {
-            if (activeCursors.get(cursor.getName()) != null) {
-                activeCursors.removeCursor(cursor.getName());
-                if (!activeCursors.hasDurableCursors()) {
-                    // cleanup cache if there is no active subscription
-                    entryCache.clear();
-                } else {
-                    // if removed subscription was the slowest subscription : update cursor and let it clear cache:
-                    // till new slowest-cursor's read-position
-                    discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(),
-                            getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
-                }
-            }
-            if (!cursor.isDurable()) {
-                nonDurableActiveCursors.removeCursor(cursor.getName());
+            if (activeCursors.removeCursor(cursorName)) {
+                invalidateEntriesUpToSlowestReaderPosition();
             }
         }
     }
 
+
     public void removeWaitingCursor(ManagedCursor cursor) {
         this.waitingCursors.remove(cursor);
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index de64cb67362..918cc22978b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -104,7 +104,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
         lastMarkDeleteEntry = mdEntry;
         // it is important to advance cursor so the retention can kick in as expected.
-        ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition);
+        ledger.onCursorMarkDeletePositionUpdated(NonDurableCursorImpl.this, mdEntry.newPosition);
 
         callback.markDeleteComplete(ctx);
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 5b34dc3eb59..982acfdedd2 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -324,7 +324,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(entries.size(), 10);
 
         factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
         assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
@@ -332,11 +332,10 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
         c2.setReadPosition(pos);
-        ledger.discardEntriesFromCache(c2, pos);
         entries.forEach(Entry::release);
 
         factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
         assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 1d9315ee296..6446eca5ae0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -402,41 +402,40 @@ public class ManagedCursorContainerTest {
 
     @Test
     public void testSlowestReadPositionForActiveCursors() throws Exception {
-        ManagedCursorContainer container =
-                new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
-        assertNull(container.getSlowestReadPositionForActiveCursors());
+        ManagedCursorContainer container = new ManagedCursorContainer();
+        assertNull(container.getSlowestReaderPosition());
 
         // Add no durable cursor
         PositionImpl position = PositionImpl.get(5,5);
         ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
         doReturn(false).when(cursor1).isDurable();
         doReturn(position).when(cursor1).getReadPosition();
-        container.add(cursor1);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+        container.add(cursor1, position);
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         // Add no durable cursor
         position = PositionImpl.get(1,1);
         ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
         doReturn(false).when(cursor2).isDurable();
         doReturn(position).when(cursor2).getReadPosition();
-        container.add(cursor2);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
+        container.add(cursor2, position);
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
 
         // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
         position = PositionImpl.get(5,6);
         container.cursorUpdated(cursor2, position);
         doReturn(position).when(cursor2).getReadPosition();
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
         position = PositionImpl.get(5,8);
         doReturn(position).when(cursor1).getReadPosition();
         container.cursorUpdated(cursor1, position);
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));
 
         // Remove cursor, only cursor1 left, cursor1 = 5:8
         container.removeCursor(cursor2.getName());
-        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
+        assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8));
     }
 
     @Test
@@ -445,25 +444,25 @@ public class ManagedCursorContainerTest {
         assertNull(container.getSlowestReaderPosition());
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
 
         ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(2, 0));
-        container.add(cursor3);
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         assertEquals(container.toString(), "[test1=5:5, test2=2:2, test3=2:0]");
 
         ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(4, 0));
-        container.add(cursor4);
+        container.add(cursor4, cursor4.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(3, 5));
-        container.add(cursor5);
+        container.add(cursor5, cursor5.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
 
         cursor3.markDelete(new PositionImpl(3, 0));
@@ -488,7 +487,7 @@ public class ManagedCursorContainerTest {
         assertFalse(container.hasDurableCursors());
 
         ManagedCursor cursor6 = new MockManagedCursor(container, "test6", new PositionImpl(6, 5));
-        container.add(cursor6);
+        container.add(cursor6, cursor6.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 5));
 
         assertEquals(container.toString(), "[test6=6:5]");
@@ -499,11 +498,11 @@ public class ManagedCursorContainerTest {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
 
         MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
 
         cursor2.position = new PositionImpl(8, 8);
@@ -521,17 +520,17 @@ public class ManagedCursorContainerTest {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
-        container.add(cursor1);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
         assertEquals(container.get("test1"), cursor1);
 
         MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
-        container.add(cursor2);
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
         assertEquals(container.get("test2"), cursor2);
 
         MockManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(1, 1));
-        container.add(cursor3);
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));
         assertEquals(container.get("test3"), cursor3);
 
@@ -563,11 +562,11 @@ public class ManagedCursorContainerTest {
         ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(cursor1);
-        container.add(cursor2);
-        container.add(cursor3);
-        container.add(cursor4);
-        container.add(cursor5);
+        container.add(cursor1, cursor1.getMarkDeletedPosition());
+        container.add(cursor2, cursor2.getMarkDeletedPosition());
+        container.add(cursor3, cursor3.getMarkDeletedPosition());
+        container.add(cursor4, cursor4.getMarkDeletedPosition());
+        container.add(cursor5, cursor5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
         container.removeCursor("test2");
@@ -597,11 +596,11 @@ public class ManagedCursorContainerTest {
         MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(c1);
-        container.add(c2);
-        container.add(c3);
-        container.add(c4);
-        container.add(c5);
+        container.add(c1, c1.getMarkDeletedPosition());
+        container.add(c2, c2.getMarkDeletedPosition());
+        container.add(c3, c3.getMarkDeletedPosition());
+        container.add(c4, c4.getMarkDeletedPosition());
+        container.add(c5, c5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
 
@@ -662,11 +661,11 @@ public class ManagedCursorContainerTest {
         MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4));
         MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0));
 
-        container.add(c1);
-        container.add(c2);
-        container.add(c3);
-        container.add(c4);
-        container.add(c5);
+        container.add(c1, c1.getMarkDeletedPosition());
+        container.add(c2, c2.getMarkDeletedPosition());
+        container.add(c3, c3.getMarkDeletedPosition());
+        container.add(c4, c4.getMarkDeletedPosition());
+        container.add(c5, c5.getMarkDeletedPosition());
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1));
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 649d72e3522..dc1af3a60cc 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -106,6 +106,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoun
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -296,265 +297,120 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testDoCacheEviction() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setCacheEvictionByMarkDeletedPosition(true);
-        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
-                .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                try {
-                                    @SuppressWarnings("unchecked")
-                                    Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                    ManagedLedger ledger = pair.getLeft();
-                                    ManagedCursor cursor = pair.getRight();
-                                    if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                        result.complete(false);
-                                        return;
-                                    }
-
-                                    ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
-                                    ledgerImpl.getActiveCursors().removeCursor(cursor.getName());
-                                    assertNull(ledgerImpl.getEvictionPosition());
-                                    assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length);
-                                    ledgerImpl.doCacheEviction(System.nanoTime());
-                                    assertTrue(ledgerImpl.getCacheSize() <= 0);
-                                    result.complete(true);
-                                } catch (Throwable e) {
-                                    result.completeExceptionally(e);
-                                }
-                            }
-
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
-
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
-                }, ledger);
-            }
+    public void shouldKeepEntriesInCacheByEarliestReadPosition() throws ManagedLedgerException, InterruptedException {
+        // This test case reproduces issue #16054
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
-        assertTrue(result.get());
-
-        log.info("Test completed");
-    }
-
-    @Test
-    public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setCacheEvictionByMarkDeletedPosition(true);
         factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
                 .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                @SuppressWarnings("unchecked")
-                                Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                ManagedLedger ledger = pair.getLeft();
-                                ManagedCursor cursor = pair.getRight();
-                                if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                    result.complete(false);
-                                    return;
-                                }
-                                cursor.asyncReadEntries(1, new ReadEntriesCallback() {
-                                    @Override
-                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                                        ManagedCursor cursor = (ManagedCursor) ctx;
-                                        assertEquals(entries.size(), 1);
-                                        Entry entry = entries.get(0);
-                                        final Position position = entry.getPosition();
-                                        if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
-                                            result.complete(false);
-                                            return;
-                                        }
-                                        ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                        if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                            result.complete(false);
-                                            return;
-                                        }
 
-                                        log.debug("Mark-Deleting to position {}", position);
-                                        cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
-                                            @Override
-                                            public void markDeleteComplete(Object ctx) {
-                                                log.debug("Mark delete complete");
-                                                ManagedCursor cursor = (ManagedCursor) ctx;
-                                                if (cursor.hasMoreEntries()) {
-                                                    result.complete(false);
-                                                    return;
-                                                }
-                                                ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                        System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                                result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
-                                            }
+        // GIVEN an opened ledger with 10 opened cursors
 
-                                            @Override
-                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
-                                                result.completeExceptionally(exception);
-                                            }
+        ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestReadPosition",
+                config);
+        List<ManagedCursor> cursors = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = ledger.openCursor("c" + i);
+            cursors.add(cursor);
+        }
 
-                                        }, cursor);
-                                    }
+        ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+        int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
 
-                                    @Override
-                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-                                        result.completeExceptionally(exception);
-                                    }
-                                }, cursor, PositionImpl.LATEST);
-                            }
+        // AND 100 added entries
 
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
+        for (int i = 0; i < 100; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
 
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
+        int insertedEntriesCount =
+                (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+        // EXPECT that 100 entries should have been inserted to the cache
+        assertEquals(insertedEntriesCount, 100);
 
-                }, ledger);
-            }
+        int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
+        // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = cursors.get(i);
+            // read entries farther of the  earliest cursor
+            List<Entry> entries = cursor.readEntries(20 - i);
+            // mark delete the least for the earliest cursor
+            cursor.markDelete(entries.get(i).getPosition());
+            entries.forEach(Entry::release);
+        }
 
-        assertTrue(result.get());
+        // THEN it is expected that the cache evicts entries to the earliest read position
+        Thread.sleep(2 * factory.getConfig().getCacheEvictionIntervalMs());
+        int evictedEntriesCount =
+                (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+        assertEquals(evictedEntriesCount, 11,
+                "It is expected that the cache evicts entries to the earliest read position");
 
-        log.info("Test completed");
+        ledger.close();
     }
 
     @Test
-    public void testCacheEvictionByReadPosition() throws Throwable {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
+    public void shouldKeepEntriesInCacheByEarliestMarkDeletePosition() throws ManagedLedgerException, InterruptedException {
+        // This test case reproduces issue #16054
+
         ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
         factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
                 .toNanos(30000));
-        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
-            @Override
-            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
-                    @Override
-                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                        ManagedLedger ledger = (ManagedLedger) ctx;
-                        String message1 = "test";
-                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
-                            @Override
-                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                                @SuppressWarnings("unchecked")
-                                Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
-                                ManagedLedger ledger = pair.getLeft();
-                                ManagedCursor cursor = pair.getRight();
-                                if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
-                                    result.complete(false);
-                                    return;
-                                }
 
-                                cursor.asyncReadEntries(1, new ReadEntriesCallback() {
-                                    @Override
-                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                                        ManagedCursor cursor = (ManagedCursor) ctx;
-                                        assertEquals(entries.size(), 1);
-                                        Entry entry = entries.get(0);
-                                        final Position position = entry.getPosition();
-                                        if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
-                                            result.complete(false);
-                                            return;
-                                        }
-                                        ((ManagedLedgerImpl) ledger).doCacheEviction(
-                                                System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
-                                        if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) {
-                                            result.complete(false);
-                                            return;
-                                        }
+        // GIVEN an opened ledger with 10 opened cursors
 
-                                        log.debug("Mark-Deleting to position {}", position);
-                                        cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
-                                            @Override
-                                            public void markDeleteComplete(Object ctx) {
-                                                log.debug("Mark delete complete");
-                                                ManagedCursor cursor = (ManagedCursor) ctx;
-                                                if (cursor.hasMoreEntries()) {
-                                                    result.complete(false);
-                                                    return;
-                                                }
-                                                result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
-                                            }
+        ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestMarkDeletePosition",
+                config);
+        List<ManagedCursor> cursors = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = ledger.openCursor("c" + i);
+            cursors.add(cursor);
+        }
 
-                                            @Override
-                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
-                                                result.completeExceptionally(exception);
-                                            }
+        ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+        int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
 
-                                        }, cursor);
-                                    }
+        // AND 100 added entries
 
-                                    @Override
-                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-                                        result.completeExceptionally(exception);
-                                    }
-                                }, cursor, PositionImpl.LATEST);
-                            }
+        for (int i = 0; i < 100; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
 
-                            @Override
-                            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                                result.completeExceptionally(exception);
-                            }
-                        }, Pair.of(ledger, cursor));
-                    }
+        int insertedEntriesCount =
+                (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+        // EXPECT that 100 entries should have been inserted to the cache
+        assertEquals(insertedEntriesCount, 100);
 
-                    @Override
-                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                        result.completeExceptionally(exception);
-                    }
+        int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
 
-                }, ledger);
-            }
+        // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+        Position lastMarkDeletePos = null;
+        for (int i = 0; i < 10; i++) {
+            ManagedCursor cursor = cursors.get(i);
+            // read 50 (+ index) entries for each cursor
+            List<Entry> entries = cursor.readEntries(50 + (5 * i));
+            // mark delete the most for the earliest cursor
+            lastMarkDeletePos = entries.get(20 - i).getPosition();
+            cursor.markDelete(lastMarkDeletePos);
+            entries.forEach(Entry::release);
+        }
 
-            @Override
-            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                result.completeExceptionally(exception);
-            }
-        }, null, null);
+        Thread.sleep(1000 + 2 * factory.getConfig().getCacheEvictionIntervalMs());
 
-        assertTrue(result.get());
+        ManagedCursorContainer activeCursors = (ManagedCursorContainer) ledger.getActiveCursors();
+        assertEquals(activeCursors.getSlowestReaderPosition(), lastMarkDeletePos);
 
-        log.info("Test completed");
+        // THEN it is expected that the cache evicts entries to the earliest read position
+        int evictedEntriesCount =
+                (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+        assertEquals(evictedEntriesCount, 11,
+                "It is expected that the cache evicts entries to the earliest read position");
+
+        ledger.close();
     }
 
     @Test(timeOut = 20000)
@@ -1783,10 +1639,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     @Test
     public void invalidateConsumedEntriesFromCache() throws Exception {
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateConsumedEntriesFromCache",
+                        config);
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
         EntryCache entryCache = ledger.entryCache;
+        entryCache.clear();
 
         ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
         ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
@@ -1799,28 +1659,78 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
+
         c2.setReadPosition(p3);
-        ledger.discardEntriesFromCache(c2, p2);
 
         assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p2);
-        ledger.discardEntriesFromCache(c1, p2);
         assertEquals(entryCache.getSize(), 7 * 3);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p3);
-        ledger.discardEntriesFromCache(c1, p3);
-        assertEquals(entryCache.getSize(), 7 * 3);
+        assertEquals(entryCache.getSize(), 7 * 2);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         ledger.deactivateCursor(c1);
-        assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4
+        assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c2.setReadPosition(p4);
+        assertEquals(entryCache.getSize(), 7);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        ledger.deactivateCursor(c2);
+        assertEquals(entryCache.getSize(), 0);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+    }
+
+    @Test
+    public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition",
+                        config);
+
+        EntryCacheManager cacheManager = factory.getEntryCacheManager();
+        EntryCache entryCache = ledger.entryCache;
+        entryCache.clear();
+
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
+
+        PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
+        PositionImpl p2 = (PositionImpl) ledger.addEntry("entry-2".getBytes());
+        PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes());
+        PositionImpl p4 = (PositionImpl) ledger.addEntry("entry-4".getBytes());
+
+        assertEquals(entryCache.getSize(), 7 * 4);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
+
         c2.setReadPosition(p4);
-        ledger.discardEntriesFromCache(c2, p4);
+        c2.markDelete(p3);
+
+        assertEquals(entryCache.getSize(), 7 * 4);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c1.setReadPosition(p3);
+        c1.markDelete(p2);
+        assertEquals(entryCache.getSize(), 7 * 3);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c1.setReadPosition(p4);
+        c1.markDelete(p3);
+        assertEquals(entryCache.getSize(), 7 * 2);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        ledger.deactivateCursor(c1);
+        assertEquals(entryCache.getSize(), 7 * 2);
+        assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+        c2.markDelete(p4);
         assertEquals(entryCache.getSize(), 7);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
@@ -2104,10 +2014,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         ledger.addEntry("data".getBytes());
         ledger.addEntry("data".getBytes());
-        
+
         Awaitility.await().untilAsserted(() -> {
             assertEquals(ledger.getLedgersInfoAsList().size(), 2);
-        });   
+        });
     }
 
     @Test
@@ -2712,8 +2622,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (3) Validate: cache should remove all entries read by both active cursors
-        log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        log.info("expected, found : {}, {}", 5 * (totalInsertedEntries - readEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
 
         final int remainingEntries = totalInsertedEntries - readEntries;
         entries1 = cursor1.readEntries(remainingEntries);
@@ -2727,7 +2637,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these
         // entries
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
 
         ledger.deactivateCursor(cursor1);
         ledger.deactivateCursor(cursor2);
@@ -2752,7 +2662,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (1) Validate: cache not stores entries as no active cursor
-        assertEquals(0, entryCache.getSize());
+        assertEquals(entryCache.getSize(), 0);
 
         // Open Cursor also adds cursor into activeCursor-container
         ManagedCursor cursor1 = ledger.openCursor("c1");
@@ -2765,7 +2675,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (2) Validate: cache stores entries as active cursor has not read message
-        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+        assertEquals(entryCache.getSize(), 5 * totalInsertedEntries);
 
         // read 20 entries
         List<Entry> entries1 = cursor1.readEntries(totalInsertedEntries);
@@ -2776,7 +2686,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // (3) Validate: cache discards all entries after all cursors are deactivated
         ledger.deactivateCursor(cursor1);
-        assertEquals(0, entryCache.getSize());
+        assertEquals(entryCache.getSize(), 0);
 
         ledger.close();
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 0fd8902f825..1e960c32bf6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -79,6 +80,9 @@ public abstract class MockedBookKeeperTestCase {
             throw e;
         }
 
+        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
+        // increase default cache eviction interval so that caching could be tested with less flakyness
+        managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
         factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
 
         setUpTestCase();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c377043df8d..d0674721c00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -622,7 +622,7 @@ public class TransactionTest extends TransactionTestBase {
         field.setAccessible(true);
         ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
         managedCursors.removeCursor("transaction-buffer-sub");
-        managedCursors.add(managedCursor);
+        managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
 
         doAnswer(invocation -> {
             AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
@@ -641,7 +641,7 @@ public class TransactionTest extends TransactionTestBase {
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
 
-        managedCursors.add(managedCursor);
+        managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
         TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer3.getStats(false).state, "Ready"));