You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2022/07/20 16:48:51 UTC

[pulsar] branch master updated: [pulsar-broker] Support caching to drain backlog consumers (#12258)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 586979152bf [pulsar-broker] Support caching to drain backlog consumers (#12258)
586979152bf is described below

commit 586979152bf22de5a1dff3e955dfcdf540257a08
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jul 20 09:48:41 2022 -0700

    [pulsar-broker] Support caching to drain backlog consumers (#12258)
---
 conf/standalone.conf                               | 10 +++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  5 ++
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 59 ++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  9 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 59 ++++++++++++--
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  7 +-
 .../bookkeeper/mledger/impl/cache/EntryCache.java  |  6 +-
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 16 ++--
 .../apache/bookkeeper/mledger/util/RangeCache.java |  4 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java | 24 +++++-
 .../pulsar/broker/service/BrokerService.java       |  6 ++
 .../apache/pulsar/broker/service/PulsarStats.java  |  3 +-
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++
 .../broker/service/persistent/PersistentTopic.java |  9 +++
 .../client/api/MessageDispatchThrottlingTest.java  | 92 ++++++++++++++++++++++
 .../client/api/SimpleProducerConsumerTest.java     |  2 +-
 .../offload/jcloud/impl/MockManagedLedger.java     |  5 ++
 19 files changed, 306 insertions(+), 19 deletions(-)

diff --git a/conf/standalone.conf b/conf/standalone.conf
index f6bf8fe91c5..b3281c02737 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -752,6 +752,16 @@ managedLedgerAddEntryTimeoutSeconds=0
 # Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
 managedLedgerNewEntriesCheckDelayInMillis=10
 
+# Minimum cursors that must be in backlog state to cache and reuse the read entries.
+# (Default =0 to disable backlog reach cache)
+managedLedgerMinimumBacklogCursorsForCaching=0
+
+# Minimum backlog entries for any cursor before start caching reads.
+managedLedgerMinimumBacklogEntriesForCaching=100
+
+# Maximum backlog entry difference to prevent caching entries that can't be reused.
+managedLedgerMaxBacklogBetweenCursorsForCaching=10000
+
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 7196a3b4c03..0ebbd514a52 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -667,4 +667,9 @@ public interface ManagedLedger {
      * roll over that ledger if inactive.
      */
     void checkInactiveLedgerAndRollOver();
+
+    /**
+     * Check if managed ledger should cache backlog reads.
+     */
+    void checkCursorsToCacheEntries();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index e628a253563..788732e763a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -81,6 +81,9 @@ public class ManagedLedgerConfig {
     @Getter
     @Setter
     private boolean cacheEvictionByMarkDeletedPosition = false;
+    private int minimumBacklogCursorsForCaching = 0;
+    private int minimumBacklogEntriesForCaching = 1000;
+    private int maxBacklogBetweenCursorsForCaching = 1000;
 
     public boolean isCreateIfMissing() {
         return createIfMissing;
@@ -683,4 +686,60 @@ public class ManagedLedgerConfig {
         this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs);
     }
 
+    /**
+     * Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors'
+     * backlog reads. (Default = 0, broker will not cache backlog reads)
+     *
+     * @return
+     */
+    public int getMinimumBacklogCursorsForCaching() {
+        return minimumBacklogCursorsForCaching;
+    }
+
+    /**
+     * Set Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other
+     * cursors' backlog reads.
+     *
+     * @param minimumBacklogCursorsForCaching
+     */
+    public void setMinimumBacklogCursorsForCaching(int minimumBacklogCursorsForCaching) {
+        this.minimumBacklogCursorsForCaching = minimumBacklogCursorsForCaching;
+    }
+
+    /**
+     * Minimum backlog should exist to leverage caching for backlog reads.
+     *
+     * @return
+     */
+    public int getMinimumBacklogEntriesForCaching() {
+        return minimumBacklogEntriesForCaching;
+    }
+
+    /**
+     * Set Minimum backlog after that broker will start caching backlog reads.
+     *
+     * @param minimumBacklogEntriesForCaching
+     */
+    public void setMinimumBacklogEntriesForCaching(int minimumBacklogEntriesForCaching) {
+        this.minimumBacklogEntriesForCaching = minimumBacklogEntriesForCaching;
+    }
+
+    /**
+     * Max backlog gap between backlogged cursors while caching to avoid caching entry which can be
+     * invalidated before other backlog cursor can reuse it from cache.
+     *
+     * @return
+     */
+    public int getMaxBacklogBetweenCursorsForCaching() {
+        return maxBacklogBetweenCursorsForCaching;
+    }
+
+    /**
+     * Set maximum backlog distance between backlogged curosr to avoid caching unused entry.
+     *
+     * @param maxBacklogBetweenCursorsForCaching
+     */
+    public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) {
+        this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c52cd6db933..e7ec241291d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -201,6 +201,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long entriesReadSize;
     private int individualDeletedMessagesSerializedSize;
     private static final String COMPACTION_CURSOR_NAME = "__compaction";
+    private volatile boolean cacheReadEntry = false;
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -3276,6 +3277,14 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.state = state;
     }
 
+    public void setCacheReadEntry(boolean cacheReadEntry) {
+        this.cacheReadEntry = cacheReadEntry;
+    }
+
+    public boolean isCacheReadEntry() {
+        return cacheReadEntry;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 
     public ManagedLedgerConfig getConfig() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a5612bc0eb9..a557ebba335 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -223,6 +223,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private long lastOffloadSuccessTimestamp = 0;
     private long lastOffloadFailureTimestamp = 0;
 
+    private int minBacklogCursorsForCaching = 0;
+    private int minBacklogEntriesForCaching = 1000;
+    private int maxBacklogBetweenCursorsForCaching = 1000;
+
     private static final Random random = new Random(System.currentTimeMillis());
     private long maximumRolloverTimeMs;
     protected final Supplier<Boolean> mlOwnershipChecker;
@@ -340,6 +344,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.mlOwnershipChecker = mlOwnershipChecker;
         this.propertiesMap = Maps.newHashMap();
         this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
+        if (config.getManagedLedgerInterceptor() != null) {
+            this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
+        }
+        this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching();
+        this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching();
+        this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
     }
 
     synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
@@ -1972,7 +1982,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
                     lastEntry);
         }
-        asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
+        asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
     }
 
     protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
@@ -1989,8 +1999,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
-            OpReadEntry opReadEntry, Object ctx) {
+    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
+            Object ctx) {
         if (config.getReadEntryTimeoutSeconds() > 0) {
             // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
             long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
@@ -1998,9 +2008,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
                     opReadEntry, readOpCount, createdTime, ctx);
             lastReadCallback = readCallback;
-            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
+            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
+                    readCallback, readOpCount);
         } else {
-            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
+            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
+                    ctx);
         }
     }
 
@@ -4144,4 +4156,41 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+
+    public void checkCursorsToCacheEntries() {
+        if (minBacklogCursorsForCaching < 1) {
+            return;
+        }
+        Iterator<ManagedCursor> it = cursors.iterator();
+        Map<ManagedCursorImpl, Long> cursorBacklogMap = new HashMap<>();
+        while (it.hasNext()) {
+            ManagedCursorImpl cursor = (ManagedCursorImpl) it.next();
+            if (cursor.isDurable()) {
+                cursorBacklogMap.put(cursor, cursor.getNumberOfEntries());
+            }
+        }
+        int cursorsInSameBacklogRange = 0;
+        for (java.util.Map.Entry<ManagedCursorImpl, Long> cursor : cursorBacklogMap.entrySet()) {
+            cursorsInSameBacklogRange = 0;
+            for (java.util.Map.Entry<ManagedCursorImpl, Long> other : cursorBacklogMap.entrySet()) {
+                if (cursor.equals(other)) {
+                    continue;
+                }
+                long backlog = cursor.getValue();
+                // if backlog difference is > maxBacklogBetweenCursorsForCaching (eg: 10000) then cached entry might be
+                // invalidated by the time so, skip caching such long range messages.
+                if (backlog < minBacklogEntriesForCaching) {
+                    continue;
+                }
+                if (Math.abs(backlog - other.getValue()) <= maxBacklogBetweenCursorsForCaching) {
+                    cursorsInSameBacklogRange++;
+                }
+            }
+            cursor.getKey().setCacheReadEntry(cursorsInSameBacklogRange >= minBacklogCursorsForCaching);
+            if (log.isDebugEnabled()) {
+                log.info("{} Enabling cache read = {} for {}", name,
+                        cursorsInSameBacklogRange >= minBacklogCursorsForCaching, cursor.getKey().getName());
+            }
+        }
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c25fa4f4d51..689385ee571 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -213,8 +213,11 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
             EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
             // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
             // insert
-            ml.entryCache.insert(entry);
-            entry.release();
+            // Entry cache doesn't copy the data if entry already exist into the cache.
+            // Backlog read tries to add entry into cache which can try to add duplicate entry into cache.
+            if (ml.entryCache.insert(entry)) {
+                entry.release();
+            }
         }
 
         PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
index 8f5b3e9b19e..81c89b37d55 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
@@ -91,14 +91,14 @@ public interface EntryCache extends Comparable<EntryCache> {
      *            the first entry to read (inclusive)
      * @param lastEntry
      *            the last entry to read (inclusive)
-     * @param isSlowestReader
-     *            whether the reader cursor is the most far behind in the stream
+     * @param shouldCacheEntry
+     *            whether the read entry should be cached
      * @param callback
      *            the callback object that will be notified when read is done
      * @param ctx
      *            the context object
      */
-    void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+    void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
             ReadEntriesCallback callback, Object ctx);
 
     /**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index d37676b8d65..d4831b3a0fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -98,6 +98,11 @@ public class RangeEntryCacheImpl implements EntryCache {
                     entry.getLength());
         }
 
+        PositionImpl position = entry.getPosition();
+        if (entries.exists(position)) {
+            return false;
+        }
+
         ByteBuf cachedData;
         if (copyEntries) {
             cachedData = copyEntry(entry);
@@ -109,7 +114,6 @@ public class RangeEntryCacheImpl implements EntryCache {
             cachedData = entry.getDataBuffer().retain();
         }
 
-        PositionImpl position = entry.getPosition();
         EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
         cachedData.release();
         if (entries.put(position, cacheEntry)) {
@@ -239,10 +243,10 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @Override
-    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
         try {
-            asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
+            asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
         } catch (Throwable t) {
             log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
             // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
@@ -254,7 +258,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
         final long ledgerId = lh.getId();
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -303,9 +307,11 @@ public class RangeEntryCacheImpl implements EntryCache {
                             final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
                             for (LedgerEntry e : ledgerEntries) {
                                 EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
-
                                 entriesToReturn.add(entry);
                                 totalSize += entry.getLength();
+                                if (shouldCacheEntry) {
+                                    insert(entry);
+                                }
                             }
 
                             manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 1de9429d7c0..3735a0658f0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -83,6 +83,10 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
         return flag.booleanValue();
     }
 
+    public boolean exists(Key key) {
+        return key != null ? entries.containsKey(key) : true;
+    }
+
     public Value get(Key key) {
         Value value = entries.get(key);
         if (value == null) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 0baaa47fe08..291ddc64572 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3112,7 +3112,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         }, null, PositionImpl.LATEST);
         ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
-                false, opReadEntry, ctxStr);
+                opReadEntry, ctxStr);
         retryStrategically((test) -> {
             return responseException2.get() != null;
         }, 5, 1000);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0e7eca59828..d2e68fb437c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1972,13 +1972,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
     private String managedLedgerInfoCompressionType = "NONE";
 
+
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
                     + "If value is NONE, then save the ManagedCursorInfo bytes data directly.")
     private String managedCursorInfoCompressionType = "NONE";
 
-    /*** --- Load balancer. --- ****/
     @FieldContext(
+            dynamic = true,
+            category = CATEGORY_STORAGE_ML,
+            doc = "Minimum cursors that must be in backlog state to cache and reuse the read entries."
+                    + "(Default =0 to disable backlog reach cache)"
+    )
+    private int managedLedgerMinimumBacklogCursorsForCaching = 0;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_STORAGE_ML,
+            doc = "Minimum backlog entries for any cursor before start caching reads"
+    )
+    private int managedLedgerMinimumBacklogEntriesForCaching = 1000;
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_STORAGE_ML,
+            doc = "Maximum backlog entry difference to prevent caching entries that can't be reused"
+    )
+    private int managedLedgerMaxBacklogBetweenCursorsForCaching = 1000;
+
+    /*** --- Load balancer. --- ****/
+     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
             doc = "Enable load balancer"
     )
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1cadb20ca1e..e80d603a141 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1602,6 +1602,12 @@ public class BrokerService implements Closeable {
                             serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
                     managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
                             serviceConfig.isCacheEvictionByMarkDeletedPosition());
+                    managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+                            serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+                    managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+                            serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+                    managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+                            serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
 
                     OffloadPoliciesImpl nsLevelOffloadPolicies =
                             (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index 817432f1915..ff74cf839ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -137,6 +137,7 @@ public class PulsarStats implements Closeable {
                                 // this task: helps to activate inactive-backlog-cursors which have caught up and
                                 // connected, also deactivate active-backlog-cursors which has backlog
                                 topic.checkBackloggedCursors();
+                                topic.checkCursorsToCacheEntries();
                                 // check if topic is inactive and require ledger rollover
                                 ((PersistentTopic) topic).checkInactiveLedgers();
                             } else if (topic instanceof NonPersistentTopic) {
@@ -253,4 +254,4 @@ public class PulsarStats implements Closeable {
     public void recordConnectionCreateFail() {
         brokerOperabilityMetrics.recordConnectionCreateFail();
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index e2ffb41390a..038f209fc1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -191,6 +191,8 @@ public interface Topic {
      */
     void checkBackloggedCursors();
 
+    void checkCursorsToCacheEntries();
+
     void checkDeduplicationSnapshot();
 
     void checkMessageExpiry();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a9deda64340..7473fdaf786 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -991,6 +991,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
         // no-op
     }
 
+    @Override
+    public void checkCursorsToCacheEntries() {
+        // no-op
+    }
+
     @Override
     public void checkDeduplicationSnapshot() {
         // no-op
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9d5e6847b95..335355b112b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2344,6 +2344,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         ledger.checkInactiveLedgerAndRollOver();
     }
 
+    @Override
+    public void checkCursorsToCacheEntries() {
+        try {
+            ledger.checkCursorsToCacheEntries();
+        } catch (Exception e) {
+            log.warn("Failed to check cursors to cache entries", e);
+        }
+    }
+
     @Override
     public void checkDeduplicationSnapshot() {
         messageDeduplication.takeSnapshot();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index dfa5a2c70a9..9ef28842dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -18,11 +18,19 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import lombok.Cleanup;
+
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,7 +43,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -1180,4 +1190,86 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         producer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+
+    /**
+     * Validates that backlog consumers cache the reads and reused by other backlog consumers while draining the
+     * backlog.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBacklogConsumerCacheReads() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setManagedLedgerMinimumBacklogCursorsForCaching(2);
+        conf.setManagedLedgerMinimumBacklogEntriesForCaching(10);
+        conf.setManagedLedgerCacheEvictionTimeThresholdMillis(60 * 1000);
+        conf.setStreamingDispatch(false);
+        restartBroker();
+        final long totalMessages = 200;
+        final int receiverSize = 10;
+        final String topicName = "cache-read";
+        final String sub1 = "sub";
+        int totalSub = 10;
+        Consumer<byte[]>[] consumers = new Consumer[totalSub];
+
+        for (int i = 0; i < totalSub; i++) {
+            consumers[i] = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/" + topicName)
+                    .subscriptionName(sub1 + "-" + i).subscriptionType(SubscriptionType.Shared)
+                    .receiverQueueSize(receiverSize).subscribe();
+        }
+        for (int i = 0; i < totalSub; i++) {
+            consumers[i].close();
+        }
+
+        final String topic = "persistent://my-property/my-ns/" + topicName;
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
+
+        producerBuilder.enableBatching(false);
+        @Cleanup
+        Producer<byte[]> producer = producerBuilder.create();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
+        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
+        cacheField.setAccessible(true);
+        RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger));
+        cacheField.set(ledger, entryCache);
+
+        // 2. Produce messages
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        ledger.checkCursorsToCacheEntries();
+
+        ledger.getCursors().forEach(cursor -> {
+            assertTrue(((ManagedCursorImpl) cursor).isCacheReadEntry());
+        });
+
+        // 3. Consume messages
+        CountDownLatch latch = new CountDownLatch((int) (totalSub * totalMessages));
+        for (int i = 0; i < totalSub; i++) {
+            consumers[i] = (Consumer<byte[]>) pulsarClient.newConsumer()
+                    .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1 + "-" + i)
+                    .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize)
+                    .messageListener((c, m) -> {
+                        latch.countDown();
+                        try {
+                            c.acknowledge(m);
+                        } catch (PulsarClientException e) {
+                            fail("failed to ack message");
+                        }
+                    }).subscribe();
+        }
+
+        latch.await();
+
+        // Verify: EntryCache has been invalidated
+        verify(entryCache, atLeastOnce()).insert(any());
+
+        for (int i = 0; i < totalSub; i++) {
+            consumers[i].close();
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d4c124d607a..9af19cc672d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4466,4 +4466,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             assertEquals(values.get(i), "msg-" + i);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 3eaf276c3c5..8ababead630 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -368,4 +368,9 @@ public class MockManagedLedger implements ManagedLedger {
     public void checkInactiveLedgerAndRollOver() {
 
     }
+
+    @Override
+    public void checkCursorsToCacheEntries() {
+        // no-op
+    }
 }