You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/02 13:09:43 UTC
[pulsar] branch master updated: [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3a993b093 [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
3a3a993b093 is described below
commit 3a3a993b093291f2721a20eb5a981a3b7db557d9
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Sep 2 15:09:34 2022 +0200
[managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
---
.../mledger/impl/cache/PendingReadsManager.java | 447 ++++++++++++++++++++
.../mledger/impl/cache/RangeEntryCacheImpl.java | 106 +++--
.../impl/cache/PendingReadsManagerTest.java | 462 +++++++++++++++++++++
.../client/api/MessageDispatchThrottlingTest.java | 8 +
4 files changed, 986 insertions(+), 37 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
new file mode 100644
index 00000000000..4c374d8ace6
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+ private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_entries_read")
+ .help("Total number of entries read from BK")
+ .register();
+
+ private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_entries_notread")
+ .help("Total number of entries not read from BK")
+ .register();
+
+ private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_matched")
+ .help("Pending reads reused with perfect range match")
+ .register();
+ private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_matched_included")
+ .help("Pending reads reused by attaching to a read with a larger range")
+ .register();
+ private static final Counter COUNT_PENDING_READS_MISSED = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_missed")
+ .help("Pending reads that didn't find a match")
+ .register();
+
+ private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+ .help("Pending reads that didn't find a match but they partially overlap with another read")
+ .register();
+
+ private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+ .help("Pending reads that didn't find a match but they partially overlap with another read")
+ .register();
+
+ private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+ .build()
+ .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+ .help("Pending reads that didn't find a match but they partially overlap with another read")
+ .register();
+
+ private final RangeEntryCacheImpl rangeEntryCache;
+ private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+ new ConcurrentHashMap<>();
+
+ public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+ this.rangeEntryCache = rangeEntryCache;
+ }
+
+ @Value
+ private static class PendingReadKey {
+ private final long startEntry;
+ private final long endEntry;
+ long size() {
+ return endEntry - startEntry + 1;
+ }
+
+
+ boolean includes(PendingReadKey other) {
+ return startEntry <= other.startEntry && other.endEntry <= endEntry;
+ }
+
+ boolean overlaps(PendingReadKey other) {
+ return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+ || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+ }
+
+ PendingReadKey reminderOnLeft(PendingReadKey other) {
+ // S******-----E
+ // S----------E
+ if (other.startEntry <= endEntry
+ && other.startEntry > startEntry) {
+ return new PendingReadKey(startEntry, other.startEntry - 1);
+ }
+ return null;
+ }
+
+ PendingReadKey reminderOnRight(PendingReadKey other) {
+ // S-----*******E
+ // S-----------E
+ if (startEntry <= other.endEntry
+ && other.endEntry < endEntry) {
+ return new PendingReadKey(other.endEntry + 1, endEntry);
+ }
+ return null;
+ }
+
+ }
+
+ @AllArgsConstructor
+ private static final class ReadEntriesCallbackWithContext {
+ final AsyncCallbacks.ReadEntriesCallback callback;
+ final Object ctx;
+ final long startEntry;
+ final long endEntry;
+ }
+
+ @AllArgsConstructor
+ private static final class FindPendingReadOutcome {
+ final PendingRead pendingRead;
+ final PendingReadKey missingOnLeft;
+ final PendingReadKey missingOnRight;
+ boolean needsAdditionalReads() {
+ return missingOnLeft != null || missingOnRight != null;
+ }
+ }
+
+ private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+ PendingRead> ledgerCache, AtomicBoolean created) {
+ synchronized (ledgerCache) {
+ PendingRead existing = ledgerCache.get(key);
+ if (existing != null) {
+ COUNT_PENDING_READS_MATCHED.inc(key.size());
+ COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+ return new FindPendingReadOutcome(existing, null, null);
+ }
+ FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+ FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+ FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+ for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+ PendingReadKey entryKey = entry.getKey();
+ if (entryKey.includes(key)) {
+ COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+ COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+ return new FindPendingReadOutcome(entry.getValue(), null, null);
+ }
+ if (entryKey.overlaps(key)) {
+ PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+ PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+ if (reminderOnLeft != null && reminderOnRight != null) {
+ foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+ reminderOnLeft, reminderOnRight);
+ } else if (reminderOnRight != null && reminderOnLeft == null) {
+ foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+ null, reminderOnRight);
+ } else if (reminderOnLeft != null && reminderOnRight == null) {
+ foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+ reminderOnLeft, null);
+ }
+ }
+ }
+
+ if (foundButMissingSomethingOnRight != null) {
+ long delta = key.size()
+ - foundButMissingSomethingOnRight.missingOnRight.size();
+ COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta);
+ COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+ return foundButMissingSomethingOnRight;
+ } else if (foundButMissingSomethingOnLeft != null) {
+ long delta = key.size()
+ - foundButMissingSomethingOnLeft.missingOnLeft.size();
+ COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta);
+ COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+ return foundButMissingSomethingOnLeft;
+ } else if (foundButMissingSomethingOnBoth != null) {
+ long delta = key.size()
+ - foundButMissingSomethingOnBoth.missingOnRight.size()
+ - foundButMissingSomethingOnBoth.missingOnLeft.size();
+ COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+ COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta);
+ return foundButMissingSomethingOnBoth;
+ }
+
+ created.set(true);
+ PendingRead newRead = new PendingRead(key, ledgerCache);
+ ledgerCache.put(key, newRead);
+ long delta = key.size();
+ COUNT_PENDING_READS_MISSED.inc(delta);
+ COUNT_ENTRIES_READ_FROM_BK.inc(delta);
+ return new FindPendingReadOutcome(newRead, null, null);
+ }
+ }
+
+ private class PendingRead {
+ final PendingReadKey key;
+ final Map<PendingReadKey, PendingRead> ledgerCache;
+ final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
+ boolean completed = false;
+
+ public PendingRead(PendingReadKey key,
+ Map<PendingReadKey, PendingRead> ledgerCache) {
+ this.key = key;
+ this.ledgerCache = ledgerCache;
+ }
+
+ private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
+ List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
+ for (EntryImpl entry : list) {
+ long entryId = entry.getEntryId();
+ if (startEntry <= entryId && entryId <= endEntry) {
+ result.add(entry);
+ } else {
+ entry.release();
+ }
+ }
+ return result;
+ }
+
+ public void attach(CompletableFuture<List<EntryImpl>> handle) {
+ // when the future is done remove this from the map
+ // new reads will go to a new instance
+ // this is required because we are going to do refcount management
+ // on the results of the callback
+ handle.whenComplete((___, error) -> {
+ synchronized (PendingRead.this) {
+ completed = true;
+ synchronized (ledgerCache) {
+ ledgerCache.remove(key, this);
+ }
+ }
+ });
+
+ handle.thenAcceptAsync(entriesToReturn -> {
+ synchronized (PendingRead.this) {
+ if (callbacks.size() == 1) {
+ ReadEntriesCallbackWithContext first = callbacks.get(0);
+ if (first.startEntry == key.startEntry
+ && first.endEntry == key.endEntry) {
+ // perfect match, no copy, this is the most common case
+ first.callback.readEntriesComplete((List) entriesToReturn,
+ first.ctx);
+ } else {
+ first.callback.readEntriesComplete(
+ (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
+ first.ctx);
+ }
+ } else {
+ for (ReadEntriesCallbackWithContext callback : callbacks) {
+ long callbackStartEntry = callback.startEntry;
+ long callbackEndEntry = callback.endEntry;
+ List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1));
+ for (EntryImpl entry : entriesToReturn) {
+ long entryId = entry.getEntryId();
+ if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) {
+ EntryImpl entryCopy = EntryImpl.create(entry);
+ copy.add(entryCopy);
+ }
+ }
+ callback.callback.readEntriesComplete((List) copy, callback.ctx);
+ }
+ for (EntryImpl entry : entriesToReturn) {
+ entry.release();
+ }
+ }
+ }
+ }, rangeEntryCache.getManagedLedger().getExecutor()
+ .chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> {
+ synchronized (PendingRead.this) {
+ for (ReadEntriesCallbackWithContext callback : callbacks) {
+ ManagedLedgerException mlException = createManagedLedgerException(exception);
+ callback.callback.readEntriesFailed(mlException, callback.ctx);
+ }
+ }
+ return null;
+ });
+ }
+
+ synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
+ Object ctx, long startEntry, long endEntry) {
+ if (completed) {
+ return false;
+ }
+ callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
+ return true;
+ }
+ }
+
+
+ void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+ final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+
+
+ final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
+
+ Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+ cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new ConcurrentHashMap<>());
+
+ boolean listenerAdded = false;
+ while (!listenerAdded) {
+ AtomicBoolean createdByThisThread = new AtomicBoolean();
+ FindPendingReadOutcome findBestCandidateOutcome = findPendingRead(key,
+ pendingReadsForLedger, createdByThisThread);
+ PendingRead pendingRead = findBestCandidateOutcome.pendingRead;
+ if (findBestCandidateOutcome.needsAdditionalReads()) {
+ AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ PendingReadKey missingOnLeft = findBestCandidateOutcome.missingOnLeft;
+ PendingReadKey missingOnRight = findBestCandidateOutcome.missingOnRight;
+ if (missingOnRight != null && missingOnLeft != null) {
+ AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+ new AsyncCallbacks.ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entriesFromLeft, Object dummyCtx1) {
+ AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+ new AsyncCallbacks.ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entriesFromRight,
+ Object dummyCtx2) {
+ List<Entry> finalResult =
+ new ArrayList<>(entriesFromLeft.size()
+ + entries.size() + entriesFromRight.size());
+ finalResult.addAll(entriesFromLeft);
+ finalResult.addAll(entries);
+ finalResult.addAll(entriesFromRight);
+ callback.readEntriesComplete(finalResult, ctx);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
+ Object dummyCtx3) {
+ callback.readEntriesFailed(exception, ctx);
+ }
+ };
+ rangeEntryCache.asyncReadEntry0(lh,
+ missingOnRight.startEntry, missingOnRight.endEntry,
+ shouldCacheEntry, readFromRightCallback, null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) {
+ callback.readEntriesFailed(exception, ctx);
+ }
+ };
+ rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+ shouldCacheEntry, readFromLeftCallback, null);
+ } else if (missingOnLeft != null) {
+ AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+ new AsyncCallbacks.ReadEntriesCallback() {
+
+ @Override
+ public void readEntriesComplete(List<Entry> entriesFromLeft,
+ Object dummyCtx5) {
+ List<Entry> finalResult =
+ new ArrayList<>(entriesFromLeft.size() + entries.size());
+ finalResult.addAll(entriesFromLeft);
+ finalResult.addAll(entries);
+ callback.readEntriesComplete(finalResult, ctx);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
+ Object dummyCtx6) {
+ callback.readEntriesFailed(exception, ctx);
+ }
+ };
+ rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+ shouldCacheEntry, readFromLeftCallback, null);
+ } else if (missingOnRight != null) {
+ AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+ new AsyncCallbacks.ReadEntriesCallback() {
+
+ @Override
+ public void readEntriesComplete(List<Entry> entriesFromRight,
+ Object dummyCtx7) {
+ List<Entry> finalResult =
+ new ArrayList<>(entriesFromRight.size() + entries.size());
+ finalResult.addAll(entries);
+ finalResult.addAll(entriesFromRight);
+ callback.readEntriesComplete(finalResult, ctx);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
+ Object dummyCtx8) {
+ callback.readEntriesFailed(exception, ctx);
+ }
+ };
+ rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry,
+ shouldCacheEntry, readFromRightCallback, null);
+ }
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ callback.readEntriesFailed(exception, ctx);
+ }
+ };
+ listenerAdded = pendingRead.addListener(wrappedCallback, ctx, key.startEntry, key.endEntry);
+ } else {
+ listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry);
+ }
+
+
+ if (createdByThisThread.get()) {
+ CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,
+ lastEntry, shouldCacheEntry);
+ pendingRead.attach(readResult);
+ }
+ }
+ }
+
+ void clear() {
+ cachedPendingReads.clear();
+ }
+
+ void invalidateLedger(long id) {
+ cachedPendingReads.remove(id);
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f8f5c328cd9..163728e0f04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -21,12 +21,14 @@ package org.apache.bookkeeper.mledger.impl.cache;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -48,15 +50,18 @@ import org.slf4j.LoggerFactory;
public class RangeEntryCacheImpl implements EntryCache {
private final RangeEntryCacheManagerImpl manager;
- private final ManagedLedgerImpl ml;
+ final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;
+ private final PendingReadsManager pendingReadsManager;
+
private static final double MB = 1024 * 1024;
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
+ this.pendingReadsManager = new PendingReadsManager(this);
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -67,6 +72,11 @@ public class RangeEntryCacheImpl implements EntryCache {
}
}
+ @VisibleForTesting
+ ManagedLedgerImpl getManagedLedger() {
+ return ml;
+ }
+
@Override
public String getName() {
return ml.getName();
@@ -185,6 +195,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
manager.entriesRemoved(sizeRemoved, entriesRemoved);
+ pendingReadsManager.invalidateLedger(ledgerId);
}
@Override
@@ -235,6 +246,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
ml.invalidateLedgerHandle(lh);
+ pendingReadsManager.invalidateLedger(lh.getId());
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
});
@@ -257,7 +269,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+ void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -295,51 +307,71 @@ public class RangeEntryCacheImpl implements EntryCache {
}
// Read all the entries from bookkeeper
- lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
- ledgerEntries -> {
- requireNonNull(ml.getName());
- requireNonNull(ml.getExecutor());
+ pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+ shouldCacheEntry, callback, ctx);
- try {
- // We got the entries, we need to transform them to a List<> type
- long totalSize = 0;
- final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
- for (LedgerEntry e : ledgerEntries) {
- EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
- entriesToReturn.add(entry);
- totalSize += entry.getLength();
- if (shouldCacheEntry) {
- EntryImpl cacheEntry = EntryImpl.create(entry);
- insert(cacheEntry);
- cacheEntry.release();
+ }
+ }
+
+ /**
+ * Reads the entries from Storage.
+ * @param lh the handle
+ * @param firstEntry the first entry
+ * @param lastEntry the last entry
+ * @param shouldCacheEntry if we should put the entry into the cache
+ * @return a handle to the operation
+ */
+ CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
+ long firstEntry, long lastEntry, boolean shouldCacheEntry) {
+ final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+ CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
+ .thenApply(
+ ledgerEntries -> {
+ requireNonNull(ml.getName());
+ requireNonNull(ml.getExecutor());
+
+ try {
+ // We got the entries, we need to transform them to a List<> type
+ long totalSize = 0;
+ final List<EntryImpl> entriesToReturn =
+ Lists.newArrayListWithExpectedSize(entriesToRead);
+ for (LedgerEntry e : ledgerEntries) {
+ EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
+ entriesToReturn.add(entry);
+ totalSize += entry.getLength();
+ if (shouldCacheEntry) {
+ EntryImpl cacheEntry = EntryImpl.create(entry);
+ insert(cacheEntry);
+ cacheEntry.release();
+ }
}
- }
- manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
- ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+ manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+ ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
- callback.readEntriesComplete((List) entriesToReturn, ctx);
- } finally {
- ledgerEntries.close();
- }
- }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
- if (exception instanceof BKException
- && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
- callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
- } else {
- ml.invalidateLedgerHandle(lh);
- ManagedLedgerException mlException = createManagedLedgerException(exception);
- callback.readEntriesFailed(mlException, ctx);
- }
- return null;
- });
- }
+ return entriesToReturn;
+ } finally {
+ ledgerEntries.close();
+ }
+ });
+ // handle LH invalidation
+ readResult.exceptionally(exception -> {
+ if (exception instanceof BKException
+ && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
+ } else {
+ ml.invalidateLedgerHandle(lh);
+ pendingReadsManager.invalidateLedger(lh.getId());
+ }
+ return null;
+ });
+ return readResult;
}
@Override
public void clear() {
Pair<Integer, Long> removedPair = entries.clear();
manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+ pendingReadsManager.clear();
}
@Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
new file mode 100644
index 00000000000..f6d3ac88156
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertSame;
+
+@Slf4j
+public class PendingReadsManagerTest {
+
+ static final Object CTX = "foo";
+ static final Object CTX2 = "far";
+ static final long ledgerId = 123414L;
+ OrderedExecutor orderedExecutor;
+
+ PendingReadsManagerTest() {
+ }
+
+ @BeforeClass(alwaysRun = true)
+ void before() {
+ orderedExecutor = OrderedExecutor.newBuilder().build();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void after() {
+ if (orderedExecutor != null) {
+ orderedExecutor.shutdown();
+ orderedExecutor = null;
+ }
+ }
+
+
+ RangeEntryCacheImpl rangeEntryCache;
+ PendingReadsManager pendingReadsManager;
+ ReadHandle lh;
+ ManagedLedgerImpl ml;
+
+ @BeforeMethod(alwaysRun = true)
+ void setupMocks() {
+ rangeEntryCache = mock(RangeEntryCacheImpl.class);
+ pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock);
+ ReadHandle rh = invocationOnMock.getArgument(0);
+ long startEntry = invocationOnMock.getArgument(1);
+ long endEntry = invocationOnMock.getArgument(2);
+ boolean shouldCacheEntry = invocationOnMock.getArgument(3);
+ AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4);
+ Object ctx = invocationOnMock.getArgument(5);
+ pendingReadsManager.readEntries(lh, startEntry, endEntry, shouldCacheEntry, callback, ctx);
+ return null;
+ }
+ }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
+ anyBoolean(), any(), any());
+
+ lh = mock(ReadHandle.class);
+ ml = mock(ManagedLedgerImpl.class);
+ when(ml.getExecutor()).thenReturn(orderedExecutor);
+ when(rangeEntryCache.getManagedLedger()).thenReturn(ml);
+ }
+
+
+ @Data
+ private static class CapturingReadEntriesCallback extends CompletableFuture<Void>
+ implements AsyncCallbacks.ReadEntriesCallback {
+ List<Position> entries;
+ Object ctx;
+ Throwable error;
+
+ @Override
+ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+ this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList());
+ this.ctx = ctx;
+ this.error = null;
+ this.complete(null);
+ }
+
+ @Override
+ public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ this.entries = null;
+ this.ctx = ctx;
+ this.error = exception;
+ this.completeExceptionally(exception);
+ }
+
+ }
+
+ private static List<EntryImpl> buildList(long start, long end) {
+ List<EntryImpl> result = new ArrayList<>();
+ for (long i = start; i <= end; i++) {
+ long entryId = i;
+ EntryImpl entry = EntryImpl.create(ledgerId, entryId, "data".getBytes(StandardCharsets.UTF_8));
+ result.add(entry);
+ }
+ return result;
+ }
+
+
+ private void verifyRange(List<Position> entries, long firstEntry, long endEntry) {
+ int pos = 0;
+ log.info("verifyRange numEntries {}", entries.size());
+ for (long entry = firstEntry; entry <= endEntry; entry++) {
+ assertEquals(entries.get(pos++).getEntryId(), entry);
+ }
+ }
+
+ private static class PreparedReadFromStorage extends CompletableFuture<List<EntryImpl>> {
+ final long firstEntry;
+ final long endEntry;
+ final boolean shouldCacheEntry;
+
+ public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) {
+ this.firstEntry = firstEntry;
+ this.endEntry = endEntry;
+ this.shouldCacheEntry = shouldCacheEntry;
+ }
+
+ @Override
+ public String toString() {
+ return "PreparedReadFromStorage("+firstEntry+","+endEntry+","+shouldCacheEntry+")";
+ }
+
+ public void storageReadCompleted() {
+ this.complete(buildList(firstEntry, endEntry));
+ }
+ }
+
+ private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache,
+ long firstEntry, long endEntry, boolean shouldCacheEntry) {
+ PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry);
+ log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+ when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), eq(shouldCacheEntry))).thenAnswer(
+ (invocationOnMock -> {
+ log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+ return read;
+ })
+ );
+ return read;
+ }
+
+ @Test
+ public void simpleRead() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1
+ = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ // complete the read
+ read1.storageReadCompleted();
+
+ // wait for the callback to complete
+ callback.get();
+ assertSame(callback.getCtx(), CTX);
+
+ // verify
+ verifyRange(callback.entries, firstEntry, endEntry);
+ }
+
+
+ @Test
+ public void simpleConcurrentReadPerfectMatch() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback2, CTX2);
+
+ // complete the read from BK
+ // only one read completes 2 callbacks
+ read1.storageReadCompleted();
+
+ callback.get();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntry, endEntry);
+
+ int pos = 0;
+ for (long entry = firstEntry; entry <= endEntry; entry++) {;
+ assertNotSame(callback.entries.get(pos), callback2.entries.get(pos));
+ assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(pos).getEntryId());
+ pos++;
+ }
+
+ }
+
+ @Test
+ public void simpleConcurrentReadIncluding() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+
+ long firstEntrySecondRead = firstEntry + 10;
+ long endEntrySecondRead = endEntry - 10;
+
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+ // complete the read from BK
+ // only one read completes 2 callbacks
+ read1.storageReadCompleted();
+
+ callback.get();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+ int pos = 0;
+ for (long entry = firstEntry; entry <= endEntry; entry++) {;
+ if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) {
+ int posInSecondList = (int) (pos - (firstEntrySecondRead - firstEntry));
+ assertNotSame(callback.entries.get(pos), callback2.entries.get(posInSecondList));
+ assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(posInSecondList).getEntryId());
+ }
+ pos++;
+ }
+
+ }
+
+ @Test
+ public void simpleConcurrentReadMissingLeft() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+
+ long firstEntrySecondRead = firstEntry - 10;
+ long endEntrySecondRead = endEntry;
+
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PreparedReadFromStorage readForLeft =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+ // complete the read from BK
+ read1.storageReadCompleted();
+ // the first read can move forward
+ callback.get();
+
+ readForLeft.storageReadCompleted();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+ }
+
+ @Test
+ public void simpleConcurrentReadMissingRight() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+
+ long firstEntrySecondRead = firstEntry;
+ long endEntrySecondRead = endEntry + 10;
+
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PreparedReadFromStorage readForRight =
+ prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+ // complete the read from BK
+ read1.storageReadCompleted();
+ // the first read can move forward
+ callback.get();
+
+ readForRight.storageReadCompleted();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+ }
+
+ @Test
+ public void simpleConcurrentReadMissingBoth() throws Exception {
+
+ long firstEntry = 100;
+ long endEntry = 199;
+
+ long firstEntrySecondRead = firstEntry - 10;
+ long endEntrySecondRead = endEntry + 10;
+
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PreparedReadFromStorage readForLeft =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+ PreparedReadFromStorage readForRight =
+ prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+ // complete the read from BK
+ read1.storageReadCompleted();
+ // the first read can move forward
+ callback.get();
+
+ readForLeft.storageReadCompleted();
+ readForRight.storageReadCompleted();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+ }
+
+
+ @Test
+ public void simpleConcurrentReadNoMatch() throws Exception {
+ long firstEntry = 100;
+ long endEntry = 199;
+
+ long firstEntrySecondRead = 1000;
+ long endEntrySecondRead = 1099;
+
+ boolean shouldCacheEntry = false;
+
+ PreparedReadFromStorage read1 =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+ PreparedReadFromStorage read2 =
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry);
+
+ PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+ CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+ CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+ read1.storageReadCompleted();
+ callback.get();
+
+ read2.storageReadCompleted();
+ callback2.get();
+
+ assertSame(callback.getCtx(), CTX);
+ assertSame(callback2.getCtx(), CTX2);
+
+ verifyRange(callback.entries, firstEntry, endEntry);
+ verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 9ef28842dfe..1d0b1be3659 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -1236,6 +1237,13 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger));
cacheField.set(ledger, entryCache);
+ Field pendingReadsManagerField = RangeEntryCacheImpl.class.getDeclaredField("pendingReadsManager");
+ pendingReadsManagerField.setAccessible(true);
+ PendingReadsManager pendingReadsManager = (PendingReadsManager) pendingReadsManagerField.get(entryCache);
+ Field cacheFieldInManager = PendingReadsManager.class.getDeclaredField("rangeEntryCache");
+ cacheFieldInManager.setAccessible(true);
+ cacheFieldInManager.set(pendingReadsManager, entryCache);
+
// 2. Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;