You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/04/04 12:02:51 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b93176c4d25 [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
b93176c4d25 is described below
commit b93176c4d25286c5fcd490fc0ff0a7c4fcfd40a5
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Apr 4 10:31:33 2023 +0800
[fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
(cherry picked from commit d1fc7323cbf61a6d2955486fc123fdde5253e72c)
---
.../broker/service/persistent/PersistentTopic.java | 30 ++++++++--
.../service/persistent/PersistentTopicTest.java | 66 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 246998872b8..fde4cb7d07f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1538,14 +1538,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return future;
}
+ private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
+ return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+ .thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
+ .orElse(Collections.emptySet()).contains(remoteCluster)
+ || topicPolicies.getReplicationClusters().get().contains(remoteCluster));
+ }
+
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
- .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
- .getClusterAsync(remoteCluster)
- .thenApply(clusterData ->
- brokerService.getReplicationClient(remoteCluster, clusterData)))
+ .thenCompose(__ -> checkReplicationCluster(remoteCluster))
+ .thenCompose(clusterExists -> {
+ if (!clusterExists) {
+ log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
+ return removeReplicator(remoteCluster).thenApply(__ -> null);
+ }
+ return brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getClusterAsync(remoteCluster)
+ .thenApply(clusterData ->
+ brokerService.getReplicationClient(remoteCluster, clusterData));
+ })
.thenAccept(replicationClient -> {
+ if (replicationClient == null) {
+ return;
+ }
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
@@ -1569,8 +1587,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
- replicators.get(remoteCluster).disconnect().thenRun(() -> {
-
+ Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
+ .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 50c92a74430..34e2642c9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -35,19 +35,25 @@ import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -55,12 +61,15 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -69,6 +78,8 @@ public class PersistentTopicTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();
}
@@ -392,4 +403,59 @@ public class PersistentTopicTest extends BrokerTestBase {
makeDeletedFailed.set(false);
persistentTopic.delete().get();
}
+
+ @DataProvider(name = "topicLevelPolicy")
+ public static Object[][] topicLevelPolicy() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "topicLevelPolicy")
+ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception {
+ final String namespace = "prop/ns-abc";
+ final String topicName = "persistent://" + namespace
+ + "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy;
+ final String remoteCluster = "remote";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
+ MessageId.earliest, true);
+
+ admin.clusters().createCluster(remoteCluster, ClusterData.builder()
+ .serviceUrl("http://localhost:11112")
+ .brokerServiceUrl("pulsar://localhost:11111")
+ .build());
+ TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
+ tenantInfo.getAllowedClusters().add(remoteCluster);
+ admin.tenants().updateTenant("prop", tenantInfo);
+
+ if (topicLevelPolicy) {
+ admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
+ } else {
+ admin.namespaces().setNamespaceReplicationClustersAsync(
+ namespace, Collections.singleton(remoteCluster)).get();
+ }
+
+ final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
+ .get(3, TimeUnit.SECONDS).orElse(null);
+ assertNotNull(topic);
+
+ final Supplier<Set<String>> getCursors = () -> {
+ final Set<String> cursors = new HashSet<>();
+ final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
+ iterable.forEach(c -> cursors.add(c.getName()));
+ return cursors;
+ };
+ assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
+
+ if (topicLevelPolicy) {
+ admin.topics().setReplicationClusters(topicName, Collections.emptyList());
+ } else {
+ admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
+ }
+ admin.clusters().deleteCluster(remoteCluster);
+ // Now the cluster and its related policy has been removed but the replicator cursor still exists
+
+ topic.initialize().get(3, TimeUnit.SECONDS);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS)
+ .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
+ }
}