You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/07 06:21:58 UTC

[pulsar] branch master updated: [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98413642995 [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
98413642995 is described below

commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sun May 7 14:21:51 2023 +0800

    [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
---
 .../service/persistent/PersistentReplicator.java   |  7 ++++
 .../pulsar/broker/service/ReplicatorTest.java      | 45 +++++++++++++++-------
 2 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index a556237f434..d882cbf56b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -545,6 +545,13 @@ public abstract class PersistentReplicator extends AbstractReplicator
     public void deleteFailed(ManagedLedgerException exception, Object ctx) {
         log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
                 exception.getMessage(), exception);
+        if (exception instanceof CursorAlreadyClosedException) {
+            log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+                            + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
+            // replicator is already deleted and cursor is already closed so, producer should also be stopped
+            closeProducerAsync();
+            return;
+        }
         if (ctx instanceof PositionImpl) {
             PositionImpl deletedEntry = (PositionImpl) ctx;
             if (deletedEntry.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 901451c022b..176eab0e94b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -51,8 +52,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1645,20 +1647,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testReplication ---");
 
         String namespace = "pulsar/global/ns2";
-        admin1.namespaces().createNamespace(namespace);
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
         final TopicName dest = TopicName
                 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
-        log.info("--- Starting producer --- " + url1);
 
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
+                .getNow(null).get();
+        final ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor("pulsar.repl.r2");
+        final ManagedCursorImpl spyCursor = spy(cursor);
+        managedLedger.getCursors().removeCursor(cursor.getName());
+        managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+        AtomicBoolean isMakeAckFail = new AtomicBoolean(false);
+        doAnswer(invocation -> {
+            Position pos = (Position) invocation.getArguments()[0];
+            AsyncCallbacks.DeleteCallback cb = (AsyncCallbacks.DeleteCallback) invocation.getArguments()[1];
+            Object ctx = invocation.getArguments()[2];
+            if (isMakeAckFail.get()) {
+                log.info("async-delete {} will be failed", pos);
+                cb.deleteFailed(new ManagedLedgerException("mocked error"), ctx);
+            } else {
+                log.info("async-delete {} will success", pos);
+                cursor.asyncDelete(pos, cb, ctx);
+            }
+            return null;
+        }).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class),
+                Mockito.any());
+
+        log.info("--- Starting producer --- " + url1);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
         // Produce from cluster1 and consume from the rest
         producer1.produce(2);
 
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
-                .getNow(null).get();
         MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
         Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
         ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
@@ -1667,25 +1690,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
                         replicator.getState()));
-
         assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
-        ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor();
 
         // Make sure all the data has replicated to the remote cluster before close the cursor.
         Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
 
-        cursor.setState(State.Closed);
-
-        Field field = ManagedCursorImpl.class.getDeclaredField("state");
-        field.setAccessible(true);
-        field.set(cursor, State.Closed);
+        isMakeAckFail.set(true);
 
         producer1.produce(10);
 
         // The cursor is closed, so the mark delete position will not move forward.
         assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
 
-        field.set(cursor, State.Open);
+        isMakeAckFail.set(false);
 
         Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
                 () -> {