You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/24 17:45:46 UTC
(pulsar) branch master updated: [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b774666331d [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
b774666331d is described below
commit b774666331db33ea6407174e0fe6e27a73160522
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Apr 25 01:45:41 2024 +0800
[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
---
.../pulsar/broker/service/AbstractReplicator.java | 10 +-
.../apache/pulsar/broker/service/Replicator.java | 2 +
.../service/persistent/PersistentReplicator.java | 9 +-
.../broker/service/persistent/PersistentTopic.java | 58 +++++--
.../broker/service/OneWayReplicatorTest.java | 166 +++++++++++++++++++++
.../broker/service/OneWayReplicatorTestBase.java | 14 +-
.../pulsar/broker/service/ReplicatorTest.java | 2 +-
7 files changed, 239 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index f34144deb0a..394fad21ae6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -248,7 +248,7 @@ public abstract class AbstractReplicator implements Replicator {
}
startProducer();
}).exceptionally(ex -> {
- log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
+ log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
+ " trigger a terminate. Replicator state: {}",
localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
terminate();
@@ -377,9 +377,13 @@ public abstract class AbstractReplicator implements Replicator {
this.producer = null;
// set the cursor as inactive.
disableReplicatorRead();
+ // release resources.
+ doReleaseResources();
});
}
+ protected void doReleaseResources() {}
+
protected boolean tryChangeStatusToTerminating() {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
return true;
@@ -468,4 +472,8 @@ public abstract class AbstractReplicator implements Replicator {
}
return compareSetAndGetState(expect, update);
}
+
+ public boolean isTerminated() {
+ return state == State.Terminating || state == State.Terminated;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 8130b855b4e..5c314397da8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -51,4 +51,6 @@ public interface Replicator {
boolean isConnected();
long getNumberOfEntriesInBacklog();
+
+ boolean isTerminated();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 5e1cc4a936a..367d1965207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -450,7 +450,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof CursorAlreadyClosedException) {
- log.error("[{}] Error reading entries because replicator is"
+ log.warn("[{}] Error reading entries because replicator is"
+ " already deleted and cursor is already closed {}, ({})",
replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
@@ -570,7 +570,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
exception.getMessage(), exception);
if (exception instanceof CursorAlreadyClosedException) {
- log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
terminate();
@@ -698,6 +698,11 @@ public abstract class PersistentReplicator extends AbstractReplicator
return producer != null && producer.isConnected();
}
+ @Override
+ protected void doReleaseResources() {
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
@VisibleForTesting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9d6855962ce..c1a75d67e3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1731,6 +1731,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return deleteForcefully();
}
+ removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Check for missing replicators
@@ -1769,6 +1770,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (log.isDebugEnabled()) {
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
}
+
+ removeTerminatedReplicators(shadowReplicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Check for missing replicators
@@ -1919,19 +1922,30 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (replicationClient == null) {
return;
}
- Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
- try {
- return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
- remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
- } catch (PulsarServerException e) {
- log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+ lock.readLock().lock();
+ try {
+ if (isClosingOrDeleting) {
+ // Whether is "transferring" or not, do not create new replicator.
+ log.info("[{}] Skip to create replicator because this topic is closing."
+ + " remote cluster: {}. State of transferring : {}",
+ topic, remoteCluster, transferring);
+ return;
}
- return null;
- });
-
- // clean up replicator if startup is failed
- if (replicator == null) {
- replicators.removeNullValue(remoteCluster);
+ Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
+ try {
+ return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
+ remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+ } catch (PulsarServerException e) {
+ log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+ }
+ return null;
+ });
+ // clean up replicator if startup is failed
+ if (replicator == null) {
+ replicators.removeNullValue(remoteCluster);
+ }
+ } finally {
+ lock.readLock().unlock();
}
});
}
@@ -3881,9 +3895,27 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
private void unfenceTopicToResume() {
- subscriptions.values().forEach(sub -> sub.resumeAfterFence());
isFenced = false;
isClosingOrDeleting = false;
+ subscriptions.values().forEach(sub -> sub.resumeAfterFence());
+ unfenceReplicatorsToResume();
+ }
+
+ private void unfenceReplicatorsToResume() {
+ checkReplication();
+ checkShadowReplication();
+ }
+
+ private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
+ Map<String, Replicator> terminatedReplicators = new HashMap<>();
+ replicators.forEach((cluster, replicator) -> {
+ if (replicator.isTerminated()) {
+ terminatedReplicators.put(cluster, replicator);
+ }
+ });
+ terminatedReplicators.entrySet().forEach(entry -> {
+ replicators.remove(entry.getKey(), entry.getValue());
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 35073575f34..9b8b567af08 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -20,18 +20,21 @@ package org.apache.pulsar.broker.service;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -48,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
@@ -486,4 +490,166 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}
+
+ /**
+ * See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
+ * Steps:
+ * 1.Create topic, does not enable replication now.
+ * - The topic will be loaded in the memory.
+ * 2.Enable namespace level replication.
+ * - Broker creates a replicator, and the internal producer of replicator is starting.
+ * - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
+ * 3.Unload bundle.
+ * - Starting to close the topic.
+ * - The replicator will be closed, but it will not close the internal producer, because the producer has not
+ * been created successfully.
+ * - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
+ * in the process of being closed now.
+ * 4.Internal producer retry to connect.
+ * - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
+ * will not be closed now.
+ * 5.Topic closed.
+ * - Cancel the stuck of closing the "repl.cursor".
+ * - The topic is wholly closed.
+ * 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
+ * to the remote cluster.
+ */
+ @Test
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
+ final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.namespaces().createNamespace(namespaceName);
+ admin2.namespaces().createNamespace(namespaceName);
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ // The delay time of next retry to create producer is below:
+ // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+ // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final int failTimes = 6;
+ injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully or not.
+ if (createProducerCounter.incrementAndGet() > failTimes) {
+ return originalProducer;
+ }
+ log.info("Retry create replicator.producer count: {}", createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable namespace level replication.
+ admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
+ AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ replicator.set(
+ (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
+ // Since we inject a producer creation error, the replicator can not start successfully.
+ assertFalse(replicator.get().isConnected());
+ });
+
+ // We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
+ // producer of the replicator started.
+ SpyCursor spyCursor =
+ spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
+ CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
+
+ // 3.Unload bundle: call "topic.close(false)".
+ // Stuck start new producer, until the state of replicator change to Stopped.
+ // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
+ Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
+ assertTrue(createProducerCounter.get() >= failTimes);
+ });
+ CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+ String state = String.valueOf(replicator.get().getState());
+ log.error("replicator state: {}", state);
+ assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
+ });
+
+ // 5.Delay close cursor, until "replicator.producer" create successfully.
+ // The next once retry time of create "replicator.producer" will be 3.2s.
+ Thread.sleep(4 * 1000);
+ log.info("Replicator.state: {}", replicator.get().getState());
+ cursorCloseSignal.startClose();
+ cursorCloseSignal.startCallback();
+ // Wait for topic close successfully.
+ topicCloseFuture.join();
+
+ // 6. Verify there is no orphan producer on the remote cluster.
+ Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
+ PersistentTopic persistentTopic2 =
+ (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ assertEquals(persistentTopic2.getProducers().size(), 0);
+ Assert.assertFalse(replicator.get().isConnected());
+ });
+
+ // cleanup.
+ cleanupTopics(namespaceName, () -> {
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ });
+ admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
+ admin1.namespaces().deleteNamespace(namespaceName);
+ admin2.namespaces().deleteNamespace(namespaceName);
+ }
+
+ @Test
+ public void testUnFenceTopicToReuse() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
+ // Wait for replicator started.
+ Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
+ waitReplicatorStarted(topicName);
+
+ // Inject an error to make topic close fails.
+ final String mockProducerName = UUID.randomUUID().toString();
+ final org.apache.pulsar.broker.service.Producer mockProducer =
+ mock(org.apache.pulsar.broker.service.Producer.class);
+ doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
+ .when(mockProducer).disconnect(any());
+ doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
+ .when(mockProducer).disconnect();
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ persistentTopic.getProducers().put(mockProducerName, mockProducer);
+
+ // Do close.
+ GeoPersistentReplicator replicator1 =
+ (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
+ try {
+ persistentTopic.close(true, false).join();
+ fail("Expected close fails due to a producer close fails");
+ } catch (Exception ex) {
+ log.info("Expected error: {}", ex.getMessage());
+ }
+
+ // Broker will call `topic.unfenceTopicToResume` if close clients fails.
+ // Verify: the replicator will be re-created.
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(producer1.isConnected());
+ GeoPersistentReplicator replicator2 =
+ (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
+ assertNotEquals(replicator1, replicator2);
+ assertFalse(replicator1.isConnected());
+ assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
+ assertTrue(replicator2.isConnected());
+ assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
+ });
+
+ // cleanup.
+ persistentTopic.getProducers().remove(mockProducerName, mockProducer);
+ producer1.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ });
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 181721e34aa..95f976f965a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -150,12 +150,16 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
}
protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
- waitChangeEventsInit(replicatedNamespace);
- admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
- admin1.namespaces().unload(replicatedNamespace);
+ cleanupTopics(replicatedNamespace, cleanupTopicAction);
+ }
+
+ protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
+ waitChangeEventsInit(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
+ admin1.namespaces().unload(namespace);
cleanupTopicAction.run();
- admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
- waitChangeEventsInit(replicatedNamespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
+ waitChangeEventsInit(namespace);
}
protected void waitChangeEventsInit(String namespace) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index a05c3468ea1..0bfcdf563d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -152,7 +152,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- @Test
+ @Test(priority = Integer.MAX_VALUE)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during