You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:56 UTC
[pulsar] 17/19: [improve][broker] Recycle OpReadEntry in some corner cases (#16399)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c37e5692a78e2aec9fc3d436f4f7adf5dd488a0
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Jul 7 21:27:13 2022 +0800
[improve][broker] Recycle OpReadEntry in some corner cases (#16399)
### Motivation
`ManagedCursorImpl` maintains a field `waitingReadOp` as the cache of
the `OpReadEntry` created in `asyncReadEntriesOrWait` when there are no
more entries to read. However, there are two cases that the created
`OpReadEntry` are not recycled:
1. `asyncReadEntriesOrWait` is called repeatedly when `waitingReadOp` is
not null and there are no more entries. The new created `OpReadEntry`
cannot pass the CAS check but it's not recycled.
2. `cancelPendingReadRequest` is called. The `waitingReadOp` is just set
with null and the previous reference is not recycled.
### Modifications
For the two cases described above, recycle the `OpReadEntry` objects.
### Verifying this change
Add `testOpReadEntryRecycle` to reproduce the corner cases and verify the
count of `recycle()` calls.
(cherry picked from commit 6cec62e89629c258f9458a726ff8ba3b644788b7)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 ++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 62 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 93ec27f2dc8..87850624751 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -858,6 +858,7 @@ public class ManagedCursorImpl implements ManagedCursor {
ctx, maxPosition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+ op.recycle();
callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
ctx);
return;
@@ -940,7 +941,11 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
}
- return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null;
+ final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
+ if (op != null) {
+ op.recycle();
+ }
+ return op != null;
}
public boolean hasPendingReadRequest() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d64cac6bbd3..8944ba57a56 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
@@ -42,10 +43,12 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -93,6 +97,8 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.IntRange;
import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -3752,5 +3758,61 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
Awaitility.await().untilAsserted(() -> assertTrue(flag.get()));
}
+ @Test
+ public void testOpReadEntryRecycle() throws Exception {
+ final Map<OpReadEntry, AtomicInteger> opReadEntryToRecycleCount = new ConcurrentHashMap<>();
+ final Supplier<OpReadEntry> createOpReadEntry = () -> {
+ final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class);
+ doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry,
+ ignored -> new AtomicInteger(0)).getAndIncrement()
+ ).when(mockedOpReadEntry).recycle();
+ return mockedOpReadEntry;
+ };
+
+ @Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
+ mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
+ .thenAnswer(__ -> createOpReadEntry.get());
+
+ final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
+ ledgerConfig.setNewEntriesCheckDelayInMillis(10);
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig);
+ final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor");
+ final List<ManagedLedgerException> exceptions = new ArrayList<>();
+ final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false);
+ final ReadEntriesCallback callback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntriesSuccess.set(true);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ exceptions.add(exception);
+ }
+ };
+
+ final int numReadRequests = 3;
+ for (int i = 0; i < numReadRequests; i++) {
+ cursor.asyncReadEntriesOrWait(1, callback, null, new PositionImpl(0, 0));
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1));
+ assertTrue(cursor.cancelPendingReadRequest());
+
+ ledger.addEntry(new byte[1]);
+ Awaitility.await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty()));
+ assertFalse(readEntriesSuccess.get());
+
+ assertEquals(exceptions.size(), numReadRequests - 1);
+ exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback"));
+ assertEquals(opReadEntryToRecycleCount.size(), 3);
+ assertEquals(opReadEntryToRecycleCount.entrySet().stream()
+ .map(Map.Entry::getValue)
+ .map(AtomicInteger::get)
+ .collect(Collectors.toList()),
+ Arrays.asList(1, 1, 1));
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}