You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 08:53:19 UTC

[pulsar] branch bewaremypower/2.8-pick-16399 created (now 06885db8161)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch bewaremypower/2.8-pick-16399
in repository https://gitbox.apache.org/repos/asf/pulsar.git


      at 06885db8161 [improve][broker] Recycle OpReadEntry in some corner cases (#16399)

This branch includes the following new commits:

     new 06885db8161 [improve][broker] Recycle OpReadEntry in some corner cases (#16399)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: [improve][broker] Recycle OpReadEntry in some corner cases (#16399)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch bewaremypower/2.8-pick-16399
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06885db8161994efb5719dde64a20a5e2e5bd186
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Jul 7 21:27:13 2022 +0800

    [improve][broker] Recycle OpReadEntry in some corner cases (#16399)
    
    (cherry picked from commit 6cec62e89629c258f9458a726ff8ba3b644788b7)
    
    In addition to #16399, this PR imports the `mockito-inline` dependency
    to fix the test failure.
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  7 ++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 62 ++++++++++++++++++++++
 pom.xml                                            | 12 +++++
 3 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c653e0090e8..bcb84de0ada 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -796,6 +796,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     ctx, maxPosition);
 
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+                op.recycle();
                 callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
                         ctx);
                 return;
@@ -877,7 +878,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
         }
-        return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null;
+        final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
+        if (op != null) {
+            op.recycle();
+        }
+        return op != null;
     }
 
     public boolean hasPendingReadRequest() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 12ec9efd572..b9ee4044f85 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -43,10 +44,12 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -58,6 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import io.netty.buffer.ByteBuf;
@@ -94,6 +98,8 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -3679,5 +3685,61 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
     }
 
+    @Test
+    public void testOpReadEntryRecycle() throws Exception {
+        final Map<OpReadEntry, AtomicInteger> opReadEntryToRecycleCount = new ConcurrentHashMap<>();
+        final Supplier<OpReadEntry> createOpReadEntry = () -> {
+            final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class);
+            doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry,
+                    ignored -> new AtomicInteger(0)).getAndIncrement()
+            ).when(mockedOpReadEntry).recycle();
+            return mockedOpReadEntry;
+        };
+
+        @Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
+        mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
+                .thenAnswer(__ -> createOpReadEntry.get());
+
+        final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
+        ledgerConfig.setNewEntriesCheckDelayInMillis(10);
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig);
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor");
+        final List<ManagedLedgerException> exceptions = new ArrayList<>();
+        final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false);
+        final ReadEntriesCallback callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntriesSuccess.set(true);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                exceptions.add(exception);
+            }
+        };
+
+        final int numReadRequests = 3;
+        for (int i = 0; i < numReadRequests; i++) {
+            cursor.asyncReadEntriesOrWait(1, callback, null, new PositionImpl(0, 0));
+        }
+        Awaitility.await().atMost(Duration.ofSeconds(1))
+                .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1));
+        assertTrue(cursor.cancelPendingReadRequest());
+
+        ledger.addEntry(new byte[1]);
+        Awaitility.await().atMost(Duration.ofSeconds(1))
+                .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty()));
+        assertFalse(readEntriesSuccess.get());
+
+        assertEquals(exceptions.size(), numReadRequests - 1);
+        exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback"));
+        assertEquals(opReadEntryToRecycleCount.size(), 3);
+        assertEquals(opReadEntryToRecycleCount.entrySet().stream()
+                        .map(Map.Entry::getValue)
+                        .map(AtomicInteger::get)
+                        .collect(Collectors.toList()),
+                Arrays.asList(1, 1, 1));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git a/pom.xml b/pom.xml
index 7e99d93a452..e675940a145 100644
--- a/pom.xml
+++ b/pom.xml
@@ -298,6 +298,12 @@ flexible messaging model and an intuitive client API.</description>
         <version>${mockito.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-inline</artifactId>
+        <version>${mockito.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.powermock</groupId>
         <artifactId>powermock-api-mockito2</artifactId>
@@ -1185,6 +1191,12 @@ flexible messaging model and an intuitive client API.</description>
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-api-mockito2</artifactId>