You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/04/04 12:02:51 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b93176c4d25 [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
b93176c4d25 is described below

commit b93176c4d25286c5fcd490fc0ff0a7c4fcfd40a5
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Apr 4 10:31:33 2023 +0800

    [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
    
    (cherry picked from commit d1fc7323cbf61a6d2955486fc123fdde5253e72c)
---
 .../broker/service/persistent/PersistentTopic.java | 30 ++++++++--
 .../service/persistent/PersistentTopicTest.java    | 66 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 246998872b8..fde4cb7d07f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1538,14 +1538,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return future;
     }
 
+    private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
+        return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+                .thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
+                        .orElse(Collections.emptySet()).contains(remoteCluster)
+                        || topicPolicies.getReplicationClusters().get().contains(remoteCluster));
+    }
+
     protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
             String localCluster) {
         return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
-                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getClusterAsync(remoteCluster)
-                        .thenApply(clusterData ->
-                                brokerService.getReplicationClient(remoteCluster, clusterData)))
+                .thenCompose(__ -> checkReplicationCluster(remoteCluster))
+                .thenCompose(clusterExists -> {
+                    if (!clusterExists) {
+                        log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
+                        return removeReplicator(remoteCluster).thenApply(__ -> null);
+                    }
+                    return brokerService.pulsar().getPulsarResources().getClusterResources()
+                            .getClusterAsync(remoteCluster)
+                            .thenApply(clusterData ->
+                                    brokerService.getReplicationClient(remoteCluster, clusterData));
+                })
                 .thenAccept(replicationClient -> {
+                    if (replicationClient == null) {
+                        return;
+                    }
                     Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
                         try {
                             return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
@@ -1569,8 +1587,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
 
-        replicators.get(remoteCluster).disconnect().thenRun(() -> {
-
+        Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
+                .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
             ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
                 @Override
                 public void deleteCursorComplete(Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 50c92a74430..34e2642c9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -35,19 +35,25 @@ import static org.testng.Assert.assertNull;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Data;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -55,12 +61,15 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -69,6 +78,8 @@ public class PersistentTopicTest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
         super.baseSetup();
     }
 
@@ -392,4 +403,59 @@ public class PersistentTopicTest extends BrokerTestBase {
         makeDeletedFailed.set(false);
         persistentTopic.delete().get();
     }
+
+    @DataProvider(name = "topicLevelPolicy")
+    public static Object[][] topicLevelPolicy() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "topicLevelPolicy")
+    public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace
+                + "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy;
+        final String remoteCluster = "remote";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
+                MessageId.earliest, true);
+
+        admin.clusters().createCluster(remoteCluster, ClusterData.builder()
+                .serviceUrl("http://localhost:11112")
+                .brokerServiceUrl("pulsar://localhost:11111")
+                .build());
+        TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
+        tenantInfo.getAllowedClusters().add(remoteCluster);
+        admin.tenants().updateTenant("prop", tenantInfo);
+
+        if (topicLevelPolicy) {
+            admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
+        } else {
+            admin.namespaces().setNamespaceReplicationClustersAsync(
+                    namespace, Collections.singleton(remoteCluster)).get();
+        }
+
+        final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
+                .get(3, TimeUnit.SECONDS).orElse(null);
+        assertNotNull(topic);
+
+        final Supplier<Set<String>> getCursors = () -> {
+            final Set<String> cursors = new HashSet<>();
+            final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
+            iterable.forEach(c -> cursors.add(c.getName()));
+            return cursors;
+        };
+        assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
+
+        if (topicLevelPolicy) {
+            admin.topics().setReplicationClusters(topicName, Collections.emptyList());
+        } else {
+            admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
+        }
+        admin.clusters().deleteCluster(remoteCluster);
+        // Now the cluster and its related policy has been removed but the replicator cursor still exists
+
+        topic.initialize().get(3, TimeUnit.SECONDS);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
+    }
 }