You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/08/28 08:13:10 UTC

[pulsar] branch master updated: [fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3422ab43839 [fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)
3422ab43839 is described below

commit 3422ab438390c252cbafaa3eb944a1b6be154a8e
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sun Aug 28 16:13:01 2022 +0800

    [fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)
    
    ### Motivation
    #10374 wants to handle the existing subscriptions when updating partition with `force` command parameter. However, it could not update the partition metadata if the `Subscription already exists for topic` exception comes. Because it miss updating when handling such exception.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 30 +++++++++++----
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 43 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6c55ac8c5d3..3db8f3a302f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -473,8 +473,8 @@ public class PersistentTopicsBase extends AdminResource {
                             })
                             .thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
                                     clusters))
-                            .thenCompose(clusters -> createSubscriptions(topicName, numPartitions).thenApply(ignore ->
-                                    clusters))
+                            .thenCompose(clusters -> createSubscriptions(topicName, numPartitions, force).thenApply(
+                                    ignore -> clusters))
                             .thenCompose(clusters -> {
                                 if (!updateLocalTopicOnly) {
                                     return updatePartitionInOtherCluster(numPartitions, clusters)
@@ -4368,10 +4368,9 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-
     private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
+        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> {
             CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
                     .updatePartitionedTopicAsync(topicName, p ->
                         new PartitionedTopicMetadata(numPartitions, p.properties));
@@ -4409,8 +4408,11 @@ public class PersistentTopicsBase extends AdminResource {
      *
      * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
      * @param numPartitions : number partitions for the topics
+     * @param ignoreConflictException : If true, ignore ConflictException: subscription already exists for topic
+     *
      */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
+    private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions,
+                                                       boolean ignoreConflictException) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions < 1) {
@@ -4445,9 +4447,21 @@ public class PersistentTopicsBase extends AdminResource {
 
                     for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
                         final String topicNamePartition = topicName.getPartition(i).toString();
-
-                        subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
-                                subscription, MessageId.latest));
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        admin.topics().createSubscriptionAsync(topicNamePartition,
+                                subscription, MessageId.latest).whenComplete((__, ex) -> {
+                            if (ex == null) {
+                                future.complete(null);
+                            } else {
+                                if (ignoreConflictException
+                                        && ex.getCause() instanceof PulsarAdminException.ConflictException) {
+                                    future.complete(null);
+                                } else {
+                                    future.completeExceptionally(ex);
+                                }
+                            }
+                        });
+                        subscriptionFutures.add(future);
                     }
                 });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a49fb85479a..ae2ed5a59ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -105,6 +105,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     private final String testTenant = "my-tenant";
     private final String testLocalCluster = "use";
     private final String testNamespace = "my-namespace";
+    private final String testNamespaceLocal = "my-namespace-local";
     protected Field uriField;
     protected UriInfo uriInfo;
     private NonPersistentTopics nonPersistentTopic;
@@ -156,6 +157,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 new TenantInfoImpl(Set.of("role1", "role2"), Set.of(testLocalCluster, "test")));
         admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of(testLocalCluster, "test"));
         admin.namespaces().createNamespace("pulsar/system", 4);
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal);
     }
 
     @Override
@@ -1558,4 +1560,45 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testUpdatePartitionedTopic()
+            throws KeeperException, InterruptedException, PulsarAdminException {
+        String topicName = "testUpdatePartitionedTopic";
+        String groupName = "cg_testUpdatePartitionedTopic";
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespaceLocal, topicName, 2, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        response = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, testNamespaceLocal, topicName, groupName, true,
+                new ResetCursorData(MessageId.latest), false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        PartitionedTopicMetadata partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 2);
+
+        doNothing().when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
+        doReturn(CompletableFuture.completedFuture(null)).when(persistentTopics)
+                .validatePartitionedTopicMetadataAsync();
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 4);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+    }
 }