You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/02 07:56:09 UTC

[pulsar] branch branch-2.8 updated: [branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 123175f5be3 [branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)
123175f5be3 is described below

commit 123175f5be364110de1fea01047df77db830a7c2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Aug 2 15:56:02 2022 +0800

    [branch-2.8][improve][broker] Recycle OpReadEntry in some corner cases (#16399) (#16869)
    
    * [improve][broker] Recycle OpReadEntry in some corner cases (#16399)
    
    (cherry picked from commit 6cec62e89629c258f9458a726ff8ba3b644788b7)
    
    In addition to #16399, this PR imports the `mockito-inline` dependency
    to fix the test failure.
    
    * Use PowerMock instead of mockito-inline
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  7 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 75 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a6036cca066..4eb44a4bab6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -796,6 +796,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     ctx, maxPosition);
 
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+                op.recycle();
                 callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
                         ctx);
                 return;
@@ -877,7 +878,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
         }
-        return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null;
+        final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
+        if (op != null) {
+            op.recycle();
+        }
+        return op != null;
     }
 
     public boolean hasPendingReadRequest() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 12ec9efd572..864b34d19e8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -43,10 +44,12 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -58,6 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import io.netty.buffer.ByteBuf;
@@ -94,18 +98,33 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.IObjectFactory;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+@PrepareForTest(OpReadEntry.class)
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
 public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
     private static final Charset Encoding = Charsets.UTF_8;
 
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+
     @DataProvider(name = "useOpenRangeSet")
     public static Object[][] useOpenRangeSet() {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
@@ -3679,5 +3698,61 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
     }
 
+    @Test
+    public void testOpReadEntryRecycle() throws Exception {
+        final Map<OpReadEntry, AtomicInteger> opReadEntryToRecycleCount = new ConcurrentHashMap<>();
+        final Supplier<OpReadEntry> createOpReadEntry = () -> {
+            final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class);
+            doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry,
+                    ignored -> new AtomicInteger(0)).getAndIncrement()
+            ).when(mockedOpReadEntry).recycle();
+            return mockedOpReadEntry;
+        };
+
+        PowerMockito.mockStatic(OpReadEntry.class);
+        PowerMockito.when(OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
+                .thenAnswer(__ -> createOpReadEntry.get());
+
+        final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
+        ledgerConfig.setNewEntriesCheckDelayInMillis(10);
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig);
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor");
+        final List<ManagedLedgerException> exceptions = new ArrayList<>();
+        final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false);
+        final ReadEntriesCallback callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                readEntriesSuccess.set(true);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                exceptions.add(exception);
+            }
+        };
+
+        final int numReadRequests = 3;
+        for (int i = 0; i < numReadRequests; i++) {
+            cursor.asyncReadEntriesOrWait(1, callback, null, new PositionImpl(0, 0));
+        }
+        Awaitility.await().atMost(Duration.ofSeconds(1))
+                .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1));
+        assertTrue(cursor.cancelPendingReadRequest());
+
+        ledger.addEntry(new byte[1]);
+        Awaitility.await().atMost(Duration.ofSeconds(1))
+                .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty()));
+        assertFalse(readEntriesSuccess.get());
+
+        assertEquals(exceptions.size(), numReadRequests - 1);
+        exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback"));
+        assertEquals(opReadEntryToRecycleCount.size(), 3);
+        assertEquals(opReadEntryToRecycleCount.entrySet().stream()
+                        .map(Map.Entry::getValue)
+                        .map(AtomicInteger::get)
+                        .collect(Collectors.toList()),
+                Arrays.asList(1, 1, 1));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }