You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/02 13:09:43 UTC

[pulsar] branch master updated: [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a3a993b093 [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
3a3a993b093 is described below

commit 3a3a993b093291f2721a20eb5a981a3b7db557d9
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Sep 2 15:09:34 2022 +0200

    [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
---
 .../mledger/impl/cache/PendingReadsManager.java    | 447 ++++++++++++++++++++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 106 +++--
 .../impl/cache/PendingReadsManagerTest.java        | 462 +++++++++++++++++++++
 .../client/api/MessageDispatchThrottlingTest.java  |   8 +
 4 files changed, 986 insertions(+), 37 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
new file mode 100644
index 00000000000..4c374d8ace6
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, null);
+                    }
+                }
+            }
+
+            if (foundButMissingSomethingOnRight != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnRight.missingOnRight.size();
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnRight;
+            } else if (foundButMissingSomethingOnLeft != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnLeft.missingOnLeft.size();
+                COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnLeft;
+            } else if (foundButMissingSomethingOnBoth != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnBoth.missingOnRight.size()
+                        - foundButMissingSomethingOnBoth.missingOnLeft.size();
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta);
+                return foundButMissingSomethingOnBoth;
+            }
+
+            created.set(true);
+            PendingRead newRead = new PendingRead(key, ledgerCache);
+            ledgerCache.put(key, newRead);
+            long delta = key.size();
+            COUNT_PENDING_READS_MISSED.inc(delta);
+            COUNT_ENTRIES_READ_FROM_BK.inc(delta);
+            return new FindPendingReadOutcome(newRead, null, null);
+        }
+    }
+
+    private class PendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, PendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
+        boolean completed = false;
+
+        public PendingRead(PendingReadKey key,
+                           Map<PendingReadKey, PendingRead> ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (PendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (PendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                                && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common case
+                            first.callback.readEntriesComplete((List) entriesToReturn,
+                                    first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
+                                    first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : callbacks) {
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1));
+                            for (EntryImpl entry : entriesToReturn) {
+                                long entryId = entry.getEntryId();
+                                if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) {
+                                    EntryImpl entryCopy = EntryImpl.create(entry);
+                                    copy.add(entryCopy);
+                                }
+                            }
+                            callback.callback.readEntriesComplete((List) copy, callback.ctx);
+                        }
+                        for (EntryImpl entry : entriesToReturn) {
+                            entry.release();
+                        }
+                    }
+                }
+            }, rangeEntryCache.getManagedLedger().getExecutor()
+                    .chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> {
+                synchronized (PendingRead.this) {
+                    for (ReadEntriesCallbackWithContext callback : callbacks) {
+                        ManagedLedgerException mlException = createManagedLedgerException(exception);
+                        callback.callback.readEntriesFailed(mlException, callback.ctx);
+                    }
+                }
+                return null;
+            });
+        }
+
+        synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
+                                         Object ctx, long startEntry, long endEntry) {
+            if (completed) {
+                return false;
+            }
+            callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
+            return true;
+        }
+    }
+
+
+    void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+                     final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+
+
+        final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
+
+        Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+                cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new ConcurrentHashMap<>());
+
+        boolean listenerAdded = false;
+        while (!listenerAdded) {
+            AtomicBoolean createdByThisThread = new AtomicBoolean();
+            FindPendingReadOutcome findBestCandidateOutcome = findPendingRead(key,
+                    pendingReadsForLedger, createdByThisThread);
+            PendingRead pendingRead = findBestCandidateOutcome.pendingRead;
+            if (findBestCandidateOutcome.needsAdditionalReads()) {
+                AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback() {
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                        PendingReadKey missingOnLeft = findBestCandidateOutcome.missingOnLeft;
+                        PendingReadKey missingOnRight = findBestCandidateOutcome.missingOnRight;
+                        if (missingOnRight != null && missingOnLeft != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+                                @Override
+                                public void readEntriesComplete(List<Entry> entriesFromLeft, Object dummyCtx1) {
+                                    AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+                                            new AsyncCallbacks.ReadEntriesCallback() {
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromRight,
+                                                                        Object dummyCtx2) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromLeft.size()
+                                                            + entries.size() + entriesFromRight.size());
+                                            finalResult.addAll(entriesFromLeft);
+                                            finalResult.addAll(entries);
+                                            finalResult.addAll(entriesFromRight);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx3) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                                    rangeEntryCache.asyncReadEntry0(lh,
+                                            missingOnRight.startEntry, missingOnRight.endEntry,
+                                            shouldCacheEntry, readFromRightCallback, null);
+                                }
+
+                                @Override
+                                public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) {
+                                    callback.readEntriesFailed(exception, ctx);
+                                }
+                            };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+                                    shouldCacheEntry, readFromLeftCallback, null);
+                        } else if (missingOnLeft != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromLeft,
+                                                                        Object dummyCtx5) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromLeft.size() + entries.size());
+                                            finalResult.addAll(entriesFromLeft);
+                                            finalResult.addAll(entries);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx6) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+                                    shouldCacheEntry, readFromLeftCallback, null);
+                        } else if (missingOnRight != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromRight,
+                                                                        Object dummyCtx7) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromRight.size() + entries.size());
+                                            finalResult.addAll(entries);
+                                            finalResult.addAll(entriesFromRight);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx8) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry,
+                                    shouldCacheEntry, readFromRightCallback, null);
+                        }
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                        callback.readEntriesFailed(exception, ctx);
+                    }
+                };
+                listenerAdded = pendingRead.addListener(wrappedCallback, ctx, key.startEntry, key.endEntry);
+            } else {
+                listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry);
+            }
+
+
+            if (createdByThisThread.get()) {
+                CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,
+                        lastEntry, shouldCacheEntry);
+                pendingRead.attach(readResult);
+            }
+        }
+    }
+
+    void clear() {
+        cachedPendingReads.clear();
+    }
+
+    void invalidateLedger(long id) {
+        cachedPendingReads.remove(id);
+    }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f8f5c328cd9..163728e0f04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -21,12 +21,14 @@ package org.apache.bookkeeper.mledger.impl.cache;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -48,15 +50,18 @@ import org.slf4j.LoggerFactory;
 public class RangeEntryCacheImpl implements EntryCache {
 
     private final RangeEntryCacheManagerImpl manager;
-    private final ManagedLedgerImpl ml;
+    final ManagedLedgerImpl ml;
     private ManagedLedgerInterceptor interceptor;
     private final RangeCache<PositionImpl, EntryImpl> entries;
     private final boolean copyEntries;
+    private final PendingReadsManager pendingReadsManager;
+
 
     private static final double MB = 1024 * 1024;
 
     public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
         this.manager = manager;
+        this.pendingReadsManager = new PendingReadsManager(this);
         this.ml = ml;
         this.interceptor = ml.getManagedLedgerInterceptor();
         this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -67,6 +72,11 @@ public class RangeEntryCacheImpl implements EntryCache {
         }
     }
 
+    @VisibleForTesting
+    ManagedLedgerImpl getManagedLedger() {
+        return ml;
+    }
+
     @Override
     public String getName() {
         return ml.getName();
@@ -185,6 +195,7 @@ public class RangeEntryCacheImpl implements EntryCache {
         }
 
         manager.entriesRemoved(sizeRemoved, entriesRemoved);
+        pendingReadsManager.invalidateLedger(ledgerId);
     }
 
     @Override
@@ -235,6 +246,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                         }
                     }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
                         ml.invalidateLedgerHandle(lh);
+                        pendingReadsManager.invalidateLedger(lh.getId());
                         callback.readEntryFailed(createManagedLedgerException(exception), ctx);
                         return null;
             });
@@ -257,7 +269,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+    void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
         final long ledgerId = lh.getId();
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -295,51 +307,71 @@ public class RangeEntryCacheImpl implements EntryCache {
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
+            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+                    shouldCacheEntry, callback, ctx);
 
-                        try {
-                            // We got the entries, we need to transform them to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
+        }
+    }
+
+    /**
+     * Reads the entries from Storage.
+     * @param lh the handle
+     * @param firstEntry the first entry
+     * @param lastEntry the last entry
+     * @param shouldCacheEntry if we should put the entry into the cache
+     * @return a handle to the operation
+     */
+    CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
+                                                       long firstEntry, long lastEntry, boolean shouldCacheEntry) {
+        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+        CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
+                .thenApply(
+                        ledgerEntries -> {
+                            requireNonNull(ml.getName());
+                            requireNonNull(ml.getExecutor());
+
+                            try {
+                                // We got the entries, we need to transform them to a List<> type
+                                long totalSize = 0;
+                                final List<EntryImpl> entriesToReturn =
+                                        Lists.newArrayListWithExpectedSize(entriesToRead);
+                                for (LedgerEntry e : ledgerEntries) {
+                                    EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
+                                    entriesToReturn.add(entry);
+                                    totalSize += entry.getLength();
+                                    if (shouldCacheEntry) {
+                                        EntryImpl cacheEntry = EntryImpl.create(entry);
+                                        insert(cacheEntry);
+                                        cacheEntry.release();
+                                    }
                                 }
-                            }
 
-                            manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+                                manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                                ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
 
-                            callback.readEntriesComplete((List) entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
-                        if (exception instanceof BKException
-                                && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
-                            callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                        } else {
-                            ml.invalidateLedgerHandle(lh);
-                            ManagedLedgerException mlException = createManagedLedgerException(exception);
-                            callback.readEntriesFailed(mlException, ctx);
-                        }
-                        return null;
-            });
-        }
+                                return entriesToReturn;
+                            } finally {
+                                ledgerEntries.close();
+                            }
+                        });
+        // handle LH invalidation
+        readResult.exceptionally(exception -> {
+            if (exception instanceof BKException
+                    && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
+            } else {
+                ml.invalidateLedgerHandle(lh);
+                pendingReadsManager.invalidateLedger(lh.getId());
+            }
+            return null;
+        });
+        return readResult;
     }
 
     @Override
     public void clear() {
         Pair<Integer, Long> removedPair = entries.clear();
         manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+        pendingReadsManager.clear();
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
new file mode 100644
index 00000000000..f6d3ac88156
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertSame;
+
+@Slf4j
+public class PendingReadsManagerTest  {
+
+    static final Object CTX = "foo";
+    static final Object CTX2 = "far";
+    static final long ledgerId = 123414L;
+    OrderedExecutor orderedExecutor;
+
+    PendingReadsManagerTest() {
+    }
+
+    @BeforeClass(alwaysRun = true)
+    void before() {
+        orderedExecutor = OrderedExecutor.newBuilder().build();
+    }
+
+    @AfterClass(alwaysRun = true)
+    void after() {
+        if (orderedExecutor != null) {
+            orderedExecutor.shutdown();
+            orderedExecutor = null;
+        }
+    }
+
+
+    RangeEntryCacheImpl rangeEntryCache;
+    PendingReadsManager pendingReadsManager;
+    ReadHandle lh;
+    ManagedLedgerImpl ml;
+
+    @BeforeMethod(alwaysRun = true)
+    void setupMocks() {
+        rangeEntryCache = mock(RangeEntryCacheImpl.class);
+        pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock);
+                ReadHandle rh = invocationOnMock.getArgument(0);
+                long startEntry = invocationOnMock.getArgument(1);
+                long endEntry = invocationOnMock.getArgument(2);
+                boolean shouldCacheEntry = invocationOnMock.getArgument(3);
+                AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4);
+                Object ctx = invocationOnMock.getArgument(5);
+                pendingReadsManager.readEntries(lh, startEntry, endEntry, shouldCacheEntry, callback, ctx);
+                return null;
+            }
+        }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
+                anyBoolean(), any(), any());
+
+        lh = mock(ReadHandle.class);
+        ml = mock(ManagedLedgerImpl.class);
+        when(ml.getExecutor()).thenReturn(orderedExecutor);
+        when(rangeEntryCache.getManagedLedger()).thenReturn(ml);
+    }
+
+
+    @Data
+    private static class CapturingReadEntriesCallback extends CompletableFuture<Void>
+            implements AsyncCallbacks.ReadEntriesCallback  {
+        List<Position> entries;
+        Object ctx;
+        Throwable error;
+
+        @Override
+        public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+            this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList());
+            this.ctx = ctx;
+            this.error = null;
+            this.complete(null);
+        }
+
+        @Override
+        public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            this.entries = null;
+            this.ctx = ctx;
+            this.error = exception;
+            this.completeExceptionally(exception);
+        }
+
+    }
+
+    private static List<EntryImpl> buildList(long start, long end) {
+        List<EntryImpl> result = new ArrayList<>();
+        for (long i = start; i <= end; i++) {
+            long entryId = i;
+            EntryImpl entry = EntryImpl.create(ledgerId, entryId, "data".getBytes(StandardCharsets.UTF_8));
+            result.add(entry);
+        }
+        return result;
+    }
+
+
+    private void verifyRange(List<Position> entries, long firstEntry, long endEntry) {
+        int pos = 0;
+        log.info("verifyRange numEntries {}", entries.size());
+        for (long entry = firstEntry; entry <= endEntry; entry++) {
+            assertEquals(entries.get(pos++).getEntryId(), entry);
+        }
+    }
+
+    private static class PreparedReadFromStorage extends CompletableFuture<List<EntryImpl>> {
+        final long firstEntry;
+        final long endEntry;
+        final boolean shouldCacheEntry;
+
+        public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) {
+            this.firstEntry = firstEntry;
+            this.endEntry = endEntry;
+            this.shouldCacheEntry = shouldCacheEntry;
+        }
+
+        @Override
+        public String toString() {
+            return "PreparedReadFromStorage("+firstEntry+","+endEntry+","+shouldCacheEntry+")";
+        }
+
+        public void storageReadCompleted() {
+            this.complete(buildList(firstEntry, endEntry));
+        }
+    }
+
+    private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache,
+                                                                      long firstEntry, long endEntry, boolean shouldCacheEntry) {
+        PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry);
+        log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+        when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), eq(shouldCacheEntry))).thenAnswer(
+                (invocationOnMock -> {
+                    log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+                    return read;
+                })
+        );
+        return read;
+    }
+
+    @Test
+    public void simpleRead() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1
+                = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        // complete the read
+        read1.storageReadCompleted();
+
+        // wait for the callback to complete
+        callback.get();
+        assertSame(callback.getCtx(), CTX);
+
+        // verify
+        verifyRange(callback.entries, firstEntry, endEntry);
+    }
+
+
+    @Test
+    public void simpleConcurrentReadPerfectMatch() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        // only one read completes 2 callbacks
+        read1.storageReadCompleted();
+
+        callback.get();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntry, endEntry);
+
+        int pos = 0;
+        for (long entry = firstEntry; entry <= endEntry; entry++) {;
+            assertNotSame(callback.entries.get(pos), callback2.entries.get(pos));
+            assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(pos).getEntryId());
+            pos++;
+        }
+
+    }
+
+    @Test
+    public void simpleConcurrentReadIncluding() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry + 10;
+        long endEntrySecondRead = endEntry - 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        // only one read completes 2 callbacks
+        read1.storageReadCompleted();
+
+        callback.get();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+        int pos = 0;
+        for (long entry = firstEntry; entry <= endEntry; entry++) {;
+            if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) {
+                int posInSecondList = (int) (pos - (firstEntrySecondRead - firstEntry));
+                assertNotSame(callback.entries.get(pos), callback2.entries.get(posInSecondList));
+                assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(posInSecondList).getEntryId());
+            }
+            pos++;
+        }
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingLeft() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry - 10;
+        long endEntrySecondRead = endEntry;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForLeft =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForLeft.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingRight() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry;
+        long endEntrySecondRead = endEntry + 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForRight =
+                prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForRight.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingBoth() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry - 10;
+        long endEntrySecondRead = endEntry + 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForLeft =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+        PreparedReadFromStorage readForRight =
+                prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForLeft.storageReadCompleted();
+        readForRight.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+
+    @Test
+    public void simpleConcurrentReadNoMatch() throws Exception {
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = 1000;
+        long endEntrySecondRead = 1099;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage read2 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        read1.storageReadCompleted();
+        callback.get();
+
+        read2.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 9ef28842dfe..1d0b1be3659 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -1236,6 +1237,13 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger));
         cacheField.set(ledger, entryCache);
 
+        Field pendingReadsManagerField = RangeEntryCacheImpl.class.getDeclaredField("pendingReadsManager");
+        pendingReadsManagerField.setAccessible(true);
+        PendingReadsManager pendingReadsManager = (PendingReadsManager) pendingReadsManagerField.get(entryCache);
+        Field cacheFieldInManager = PendingReadsManager.class.getDeclaredField("rangeEntryCache");
+        cacheFieldInManager.setAccessible(true);
+        cacheFieldInManager.set(pendingReadsManager, entryCache);
+
         // 2. Produce messages
         for (int i = 0; i < totalMessages; i++) {
             String message = "my-message-" + i;