You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2022/07/20 16:48:51 UTC
[pulsar] branch master updated: [pulsar-broker] Support caching to drain backlog consumers (#12258)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 586979152bf [pulsar-broker] Support caching to drain backlog consumers (#12258)
586979152bf is described below
commit 586979152bf22de5a1dff3e955dfcdf540257a08
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jul 20 09:48:41 2022 -0700
[pulsar-broker] Support caching to drain backlog consumers (#12258)
---
conf/standalone.conf | 10 +++
.../apache/bookkeeper/mledger/ManagedLedger.java | 5 ++
.../bookkeeper/mledger/ManagedLedgerConfig.java | 59 ++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 +++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 59 ++++++++++++--
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 7 +-
.../bookkeeper/mledger/impl/cache/EntryCache.java | 6 +-
.../mledger/impl/cache/RangeEntryCacheImpl.java | 16 ++--
.../apache/bookkeeper/mledger/util/RangeCache.java | 4 +
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 24 +++++-
.../pulsar/broker/service/BrokerService.java | 6 ++
.../apache/pulsar/broker/service/PulsarStats.java | 3 +-
.../org/apache/pulsar/broker/service/Topic.java | 2 +
.../service/nonpersistent/NonPersistentTopic.java | 5 ++
.../broker/service/persistent/PersistentTopic.java | 9 +++
.../client/api/MessageDispatchThrottlingTest.java | 92 ++++++++++++++++++++++
.../client/api/SimpleProducerConsumerTest.java | 2 +-
.../offload/jcloud/impl/MockManagedLedger.java | 5 ++
19 files changed, 306 insertions(+), 19 deletions(-)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f6bf8fe91c5..b3281c02737 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -752,6 +752,16 @@ managedLedgerAddEntryTimeoutSeconds=0
# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
managedLedgerNewEntriesCheckDelayInMillis=10
+# Minimum cursors that must be in backlog state to cache and reuse the read entries.
+# (Default =0 to disable backlog reach cache)
+managedLedgerMinimumBacklogCursorsForCaching=0
+
+# Minimum backlog entries for any cursor before start caching reads.
+managedLedgerMinimumBacklogEntriesForCaching=100
+
+# Maximum backlog entry difference to prevent caching entries that can't be reused.
+managedLedgerMaxBacklogBetweenCursorsForCaching=10000
+
# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 7196a3b4c03..0ebbd514a52 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -667,4 +667,9 @@ public interface ManagedLedger {
* roll over that ledger if inactive.
*/
void checkInactiveLedgerAndRollOver();
+
+ /**
+ * Check if managed ledger should cache backlog reads.
+ */
+ void checkCursorsToCacheEntries();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index e628a253563..788732e763a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -81,6 +81,9 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
+ private int minimumBacklogCursorsForCaching = 0;
+ private int minimumBacklogEntriesForCaching = 1000;
+ private int maxBacklogBetweenCursorsForCaching = 1000;
public boolean isCreateIfMissing() {
return createIfMissing;
@@ -683,4 +686,60 @@ public class ManagedLedgerConfig {
this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs);
}
+ /**
+ * Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors'
+ * backlog reads. (Default = 0, broker will not cache backlog reads)
+ *
+ * @return
+ */
+ public int getMinimumBacklogCursorsForCaching() {
+ return minimumBacklogCursorsForCaching;
+ }
+
+ /**
+ * Set Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other
+ * cursors' backlog reads.
+ *
+ * @param minimumBacklogCursorsForCaching
+ */
+ public void setMinimumBacklogCursorsForCaching(int minimumBacklogCursorsForCaching) {
+ this.minimumBacklogCursorsForCaching = minimumBacklogCursorsForCaching;
+ }
+
+ /**
+ * Minimum backlog should exist to leverage caching for backlog reads.
+ *
+ * @return
+ */
+ public int getMinimumBacklogEntriesForCaching() {
+ return minimumBacklogEntriesForCaching;
+ }
+
+ /**
+ * Set Minimum backlog after that broker will start caching backlog reads.
+ *
+ * @param minimumBacklogEntriesForCaching
+ */
+ public void setMinimumBacklogEntriesForCaching(int minimumBacklogEntriesForCaching) {
+ this.minimumBacklogEntriesForCaching = minimumBacklogEntriesForCaching;
+ }
+
+ /**
+ * Max backlog gap between backlogged cursors while caching to avoid caching entry which can be
+ * invalidated before other backlog cursor can reuse it from cache.
+ *
+ * @return
+ */
+ public int getMaxBacklogBetweenCursorsForCaching() {
+ return maxBacklogBetweenCursorsForCaching;
+ }
+
+ /**
+ * Set maximum backlog distance between backlogged curosr to avoid caching unused entry.
+ *
+ * @param maxBacklogBetweenCursorsForCaching
+ */
+ public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) {
+ this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c52cd6db933..e7ec241291d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -201,6 +201,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private long entriesReadSize;
private int individualDeletedMessagesSerializedSize;
private static final String COMPACTION_CURSOR_NAME = "__compaction";
+ private volatile boolean cacheReadEntry = false;
class MarkDeleteEntry {
final PositionImpl newPosition;
@@ -3276,6 +3277,14 @@ public class ManagedCursorImpl implements ManagedCursor {
this.state = state;
}
+ public void setCacheReadEntry(boolean cacheReadEntry) {
+ this.cacheReadEntry = cacheReadEntry;
+ }
+
+ public boolean isCacheReadEntry() {
+ return cacheReadEntry;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
public ManagedLedgerConfig getConfig() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a5612bc0eb9..a557ebba335 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -223,6 +223,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastOffloadSuccessTimestamp = 0;
private long lastOffloadFailureTimestamp = 0;
+ private int minBacklogCursorsForCaching = 0;
+ private int minBacklogEntriesForCaching = 1000;
+ private int maxBacklogBetweenCursorsForCaching = 1000;
+
private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;
@@ -340,6 +344,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
+ if (config.getManagedLedgerInterceptor() != null) {
+ this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
+ }
+ this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching();
+ this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching();
+ this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
@@ -1972,7 +1982,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
- asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
+ asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}
protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
@@ -1989,8 +1999,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
- protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
- OpReadEntry opReadEntry, Object ctx) {
+ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
+ Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
@@ -1998,9 +2008,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
- entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
+ readCallback, readOpCount);
} else {
- entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
+ ctx);
}
}
@@ -4144,4 +4156,41 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
+
+ public void checkCursorsToCacheEntries() {
+ if (minBacklogCursorsForCaching < 1) {
+ return;
+ }
+ Iterator<ManagedCursor> it = cursors.iterator();
+ Map<ManagedCursorImpl, Long> cursorBacklogMap = new HashMap<>();
+ while (it.hasNext()) {
+ ManagedCursorImpl cursor = (ManagedCursorImpl) it.next();
+ if (cursor.isDurable()) {
+ cursorBacklogMap.put(cursor, cursor.getNumberOfEntries());
+ }
+ }
+ int cursorsInSameBacklogRange = 0;
+ for (java.util.Map.Entry<ManagedCursorImpl, Long> cursor : cursorBacklogMap.entrySet()) {
+ cursorsInSameBacklogRange = 0;
+ for (java.util.Map.Entry<ManagedCursorImpl, Long> other : cursorBacklogMap.entrySet()) {
+ if (cursor.equals(other)) {
+ continue;
+ }
+ long backlog = cursor.getValue();
+ // if backlog difference is > maxBacklogBetweenCursorsForCaching (eg: 10000) then cached entry might be
+ // invalidated by the time so, skip caching such long range messages.
+ if (backlog < minBacklogEntriesForCaching) {
+ continue;
+ }
+ if (Math.abs(backlog - other.getValue()) <= maxBacklogBetweenCursorsForCaching) {
+ cursorsInSameBacklogRange++;
+ }
+ }
+ cursor.getKey().setCacheReadEntry(cursorsInSameBacklogRange >= minBacklogCursorsForCaching);
+ if (log.isDebugEnabled()) {
+ log.info("{} Enabling cache read = {} for {}", name,
+ cursorsInSameBacklogRange >= minBacklogCursorsForCaching, cursor.getKey().getName());
+ }
+ }
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c25fa4f4d51..689385ee571 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -213,8 +213,11 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
- ml.entryCache.insert(entry);
- entry.release();
+ // Entry cache doesn't copy the data if entry already exist into the cache.
+ // Backlog read tries to add entry into cache which can try to add duplicate entry into cache.
+ if (ml.entryCache.insert(entry)) {
+ entry.release();
+ }
}
PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
index 8f5b3e9b19e..81c89b37d55 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
@@ -91,14 +91,14 @@ public interface EntryCache extends Comparable<EntryCache> {
* the first entry to read (inclusive)
* @param lastEntry
* the last entry to read (inclusive)
- * @param isSlowestReader
- * whether the reader cursor is the most far behind in the stream
+ * @param shouldCacheEntry
+ * whether the read entry should be cached
* @param callback
* the callback object that will be notified when read is done
* @param ctx
* the context object
*/
- void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+ void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
ReadEntriesCallback callback, Object ctx);
/**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index d37676b8d65..d4831b3a0fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -98,6 +98,11 @@ public class RangeEntryCacheImpl implements EntryCache {
entry.getLength());
}
+ PositionImpl position = entry.getPosition();
+ if (entries.exists(position)) {
+ return false;
+ }
+
ByteBuf cachedData;
if (copyEntries) {
cachedData = copyEntry(entry);
@@ -109,7 +114,6 @@ public class RangeEntryCacheImpl implements EntryCache {
cachedData = entry.getDataBuffer().retain();
}
- PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
@@ -239,10 +243,10 @@ public class RangeEntryCacheImpl implements EntryCache {
}
@Override
- public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
try {
- asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
+ asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
@@ -254,7 +258,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -303,9 +307,11 @@ public class RangeEntryCacheImpl implements EntryCache {
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
-
entriesToReturn.add(entry);
totalSize += entry.getLength();
+ if (shouldCacheEntry) {
+ insert(entry);
+ }
}
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 1de9429d7c0..3735a0658f0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -83,6 +83,10 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
return flag.booleanValue();
}
+ public boolean exists(Key key) {
+ return key != null ? entries.containsKey(key) : true;
+ }
+
public Value get(Key key) {
Value value = entries.get(key);
if (value == null) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 0baaa47fe08..291ddc64572 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3112,7 +3112,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}, null, PositionImpl.LATEST);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
- false, opReadEntry, ctxStr);
+ opReadEntry, ctxStr);
retryStrategically((test) -> {
return responseException2.get() != null;
}, 5, 1000);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0e7eca59828..d2e68fb437c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1972,13 +1972,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
private String managedLedgerInfoCompressionType = "NONE";
+
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ "If value is NONE, then save the ManagedCursorInfo bytes data directly.")
private String managedCursorInfoCompressionType = "NONE";
- /*** --- Load balancer. --- ****/
@FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Minimum cursors that must be in backlog state to cache and reuse the read entries."
+ + "(Default =0 to disable backlog reach cache)"
+ )
+ private int managedLedgerMinimumBacklogCursorsForCaching = 0;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Minimum backlog entries for any cursor before start caching reads"
+ )
+ private int managedLedgerMinimumBacklogEntriesForCaching = 1000;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Maximum backlog entry difference to prevent caching entries that can't be reused"
+ )
+ private int managedLedgerMaxBacklogBetweenCursorsForCaching = 1000;
+
+ /*** --- Load balancer. --- ****/
+ @FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Enable load balancer"
)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1cadb20ca1e..e80d603a141 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1602,6 +1602,12 @@ public class BrokerService implements Closeable {
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());
+ managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+ serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+ managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+ serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+ managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+ serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index 817432f1915..ff74cf839ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -137,6 +137,7 @@ public class PulsarStats implements Closeable {
// this task: helps to activate inactive-backlog-cursors which have caught up and
// connected, also deactivate active-backlog-cursors which has backlog
topic.checkBackloggedCursors();
+ topic.checkCursorsToCacheEntries();
// check if topic is inactive and require ledger rollover
((PersistentTopic) topic).checkInactiveLedgers();
} else if (topic instanceof NonPersistentTopic) {
@@ -253,4 +254,4 @@ public class PulsarStats implements Closeable {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index e2ffb41390a..038f209fc1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -191,6 +191,8 @@ public interface Topic {
*/
void checkBackloggedCursors();
+ void checkCursorsToCacheEntries();
+
void checkDeduplicationSnapshot();
void checkMessageExpiry();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a9deda64340..7473fdaf786 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -991,6 +991,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
// no-op
}
+ @Override
+ public void checkCursorsToCacheEntries() {
+ // no-op
+ }
+
@Override
public void checkDeduplicationSnapshot() {
// no-op
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9d5e6847b95..335355b112b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2344,6 +2344,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
ledger.checkInactiveLedgerAndRollOver();
}
+ @Override
+ public void checkCursorsToCacheEntries() {
+ try {
+ ledger.checkCursorsToCacheEntries();
+ } catch (Exception e) {
+ log.warn("Failed to check cursors to cache entries", e);
+ }
+ }
+
@Override
public void checkDeduplicationSnapshot() {
messageDeduplication.takeSnapshot();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index dfa5a2c70a9..9ef28842dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -18,11 +18,19 @@
*/
package org.apache.pulsar.client.api;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
+
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
@@ -35,7 +43,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -1180,4 +1190,86 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
producer.close();
log.info("-- Exiting {} test --", methodName);
}
+
+ /**
+ * Validates that backlog consumers cache the reads and reused by other backlog consumers while draining the
+ * backlog.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBacklogConsumerCacheReads() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setManagedLedgerMinimumBacklogCursorsForCaching(2);
+ conf.setManagedLedgerMinimumBacklogEntriesForCaching(10);
+ conf.setManagedLedgerCacheEvictionTimeThresholdMillis(60 * 1000);
+ conf.setStreamingDispatch(false);
+ restartBroker();
+ final long totalMessages = 200;
+ final int receiverSize = 10;
+ final String topicName = "cache-read";
+ final String sub1 = "sub";
+ int totalSub = 10;
+ Consumer<byte[]>[] consumers = new Consumer[totalSub];
+
+ for (int i = 0; i < totalSub; i++) {
+ consumers[i] = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/" + topicName)
+ .subscriptionName(sub1 + "-" + i).subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(receiverSize).subscribe();
+ }
+ for (int i = 0; i < totalSub; i++) {
+ consumers[i].close();
+ }
+
+ final String topic = "persistent://my-property/my-ns/" + topicName;
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
+
+ producerBuilder.enableBatching(false);
+ @Cleanup
+ Producer<byte[]> producer = producerBuilder.create();
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
+ Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
+ cacheField.setAccessible(true);
+ RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger));
+ cacheField.set(ledger, entryCache);
+
+ // 2. Produce messages
+ for (int i = 0; i < totalMessages; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+ ledger.checkCursorsToCacheEntries();
+
+ ledger.getCursors().forEach(cursor -> {
+ assertTrue(((ManagedCursorImpl) cursor).isCacheReadEntry());
+ });
+
+ // 3. Consume messages
+ CountDownLatch latch = new CountDownLatch((int) (totalSub * totalMessages));
+ for (int i = 0; i < totalSub; i++) {
+ consumers[i] = (Consumer<byte[]>) pulsarClient.newConsumer()
+ .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1 + "-" + i)
+ .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize)
+ .messageListener((c, m) -> {
+ latch.countDown();
+ try {
+ c.acknowledge(m);
+ } catch (PulsarClientException e) {
+ fail("failed to ack message");
+ }
+ }).subscribe();
+ }
+
+ latch.await();
+
+ // Verify: EntryCache has been invalidated
+ verify(entryCache, atLeastOnce()).insert(any());
+
+ for (int i = 0; i < totalSub; i++) {
+ consumers[i].close();
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d4c124d607a..9af19cc672d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4466,4 +4466,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
assertEquals(values.get(i), "msg-" + i);
}
}
-}
+}
\ No newline at end of file
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 3eaf276c3c5..8ababead630 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -368,4 +368,9 @@ public class MockManagedLedger implements ManagedLedger {
public void checkInactiveLedgerAndRollOver() {
}
+
+ @Override
+ public void checkCursorsToCacheEntries() {
+ // no-op
+ }
}