You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/07 06:21:58 UTC
[pulsar] branch master updated: [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98413642995 [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
98413642995 is described below
commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sun May 7 14:21:51 2023 +0800
[fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
---
.../service/persistent/PersistentReplicator.java | 7 ++++
.../pulsar/broker/service/ReplicatorTest.java | 45 +++++++++++++++-------
2 files changed, 38 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index a556237f434..d882cbf56b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -545,6 +545,13 @@ public abstract class PersistentReplicator extends AbstractReplicator
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
exception.getMessage(), exception);
+ if (exception instanceof CursorAlreadyClosedException) {
+ log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
+ // replicator is already deleted and cursor is already closed so, producer should also be stopped
+ closeProducerAsync();
+ return;
+ }
if (ctx instanceof PositionImpl) {
PositionImpl deletedEntry = (PositionImpl) ctx;
if (deletedEntry.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 901451c022b..176eab0e94b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
@@ -51,8 +52,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1645,20 +1647,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testReplication ---");
String namespace = "pulsar/global/ns2";
- admin1.namespaces().createNamespace(namespace);
- admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
+ .getNow(null).get();
+ final ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+ final ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor("pulsar.repl.r2");
+ final ManagedCursorImpl spyCursor = spy(cursor);
+ managedLedger.getCursors().removeCursor(cursor.getName());
+ managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+ AtomicBoolean isMakeAckFail = new AtomicBoolean(false);
+ doAnswer(invocation -> {
+ Position pos = (Position) invocation.getArguments()[0];
+ AsyncCallbacks.DeleteCallback cb = (AsyncCallbacks.DeleteCallback) invocation.getArguments()[1];
+ Object ctx = invocation.getArguments()[2];
+ if (isMakeAckFail.get()) {
+ log.info("async-delete {} will be failed", pos);
+ cb.deleteFailed(new ManagedLedgerException("mocked error"), ctx);
+ } else {
+ log.info("async-delete {} will success", pos);
+ cursor.asyncDelete(pos, cb, ctx);
+ }
+ return null;
+ }).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class),
+ Mockito.any());
+
+ log.info("--- Starting producer --- " + url1);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
// Produce from cluster1 and consume from the rest
producer1.produce(2);
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
- .getNow(null).get();
MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
@@ -1667,25 +1690,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
replicator.getState()));
-
assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
- ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor();
// Make sure all the data has replicated to the remote cluster before close the cursor.
Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
- cursor.setState(State.Closed);
-
- Field field = ManagedCursorImpl.class.getDeclaredField("state");
- field.setAccessible(true);
- field.set(cursor, State.Closed);
+ isMakeAckFail.set(true);
producer1.produce(10);
// The cursor is closed, so the mark delete position will not move forward.
assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
- field.set(cursor, State.Open);
+ isMakeAckFail.set(false);
Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
() -> {