You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/02 07:56:09 UTC
[pulsar] branch branch-2.8 updated: [branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 123175f5be3 [branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)
123175f5be3 is described below
commit 123175f5be364110de1fea01047df77db830a7c2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Aug 2 15:56:02 2022 +0800
[branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)
* [improve][broker] Recycle OpReadEntry in some corner cases (#16399)
(cherry picked from commit 6cec62e89629c258f9458a726ff8ba3b644788b7)
In addition to #16399, this PR imports the `mockito-inline` dependency
to fix the test failure.
* Use PowerMock instead of mockito-inline
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 75 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a6036cca066..4eb44a4bab6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -796,6 +796,7 @@ public class ManagedCursorImpl implements ManagedCursor {
ctx, maxPosition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+ op.recycle();
callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
ctx);
return;
@@ -877,7 +878,11 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
}
- return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null;
+ final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
+ if (op != null) {
+ op.recycle();
+ }
+ return op != null;
}
public boolean hasPendingReadRequest() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 12ec9efd572..864b34d19e8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
@@ -43,10 +44,12 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -58,6 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.netty.buffer.ByteBuf;
@@ -94,18 +98,33 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.IntRange;
import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import org.testng.IObjectFactory;
import org.testng.annotations.DataProvider;
+import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+@PrepareForTest(OpReadEntry.class)
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class ManagedCursorTest extends MockedBookKeeperTestCase {
private static final Charset Encoding = Charsets.UTF_8;
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+
@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
@@ -3679,5 +3698,61 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
}
+ @Test
+ public void testOpReadEntryRecycle() throws Exception {
+ final Map<OpReadEntry, AtomicInteger> opReadEntryToRecycleCount = new ConcurrentHashMap<>();
+ final Supplier<OpReadEntry> createOpReadEntry = () -> {
+ final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class);
+ doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry,
+ ignored -> new AtomicInteger(0)).getAndIncrement()
+ ).when(mockedOpReadEntry).recycle();
+ return mockedOpReadEntry;
+ };
+
+ PowerMockito.mockStatic(OpReadEntry.class);
+ PowerMockito.when(OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
+ .thenAnswer(__ -> createOpReadEntry.get());
+
+ final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
+ ledgerConfig.setNewEntriesCheckDelayInMillis(10);
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig);
+ final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor");
+ final List<ManagedLedgerException> exceptions = new ArrayList<>();
+ final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false);
+ final ReadEntriesCallback callback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ readEntriesSuccess.set(true);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ exceptions.add(exception);
+ }
+ };
+
+ final int numReadRequests = 3;
+ for (int i = 0; i < numReadRequests; i++) {
+ cursor.asyncReadEntriesOrWait(1, callback, null, new PositionImpl(0, 0));
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1));
+ assertTrue(cursor.cancelPendingReadRequest());
+
+ ledger.addEntry(new byte[1]);
+ Awaitility.await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty()));
+ assertFalse(readEntriesSuccess.get());
+
+ assertEquals(exceptions.size(), numReadRequests - 1);
+ exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback"));
+ assertEquals(opReadEntryToRecycleCount.size(), 3);
+ assertEquals(opReadEntryToRecycleCount.entrySet().stream()
+ .map(Map.Entry::getValue)
+ .map(AtomicInteger::get)
+ .collect(Collectors.toList()),
+ Arrays.asList(1, 1, 1));
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}