You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/04/08 10:41:22 UTC

(pulsar) branch branch-3.2 updated: [admin][broker] Fix force delete subscription not working (#22423)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ed58028e0bd [admin][broker] Fix force delete subscription not working (#22423)
ed58028e0bd is described below

commit ed58028e0bd0939b616d29ea31558fedcbc563d7
Author: 道君 <da...@apache.org>
AuthorDate: Thu Apr 4 23:08:45 2024 +0800

    [admin][broker] Fix force delete subscription not working (#22423)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3eddddf8773..318e2bc2cde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1566,7 +1566,7 @@ public class PersistentTopicsBase extends AdminResource {
                         for (int i = 0; i < partitionMetadata.partitions; i++) {
                             TopicName topicNamePartition = topicName.getPartition(i);
                             futures.add(adminClient.topics()
-                                    .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+                                    .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
                         }
 
                         return FutureUtil.waitForAll(futures).handle((result, exception) -> {
@@ -1585,8 +1585,7 @@ public class PersistentTopicsBase extends AdminResource {
                             return null;
                         });
                     }
-                    return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
-                            force);
+                    return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force);
                 });
             }
         });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6641a5805c0..8e1375303ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -79,11 +79,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1793,6 +1795,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName));
     }
 
+    @Test
+    public void testForceDeleteSubscription() throws Exception {
+        try {
+            pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+            String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription";
+            String subName = "sub1";
+            admin.topics().createNonPartitionedTopic(topicName);
+            admin.topics().createSubscription(topicName, subName, MessageId.latest);
+
+            @Cleanup
+            Consumer<String> c0 = pulsarClient.newConsumer(Schema.STRING)
+                    .topic(topicName)
+                    .subscriptionName(subName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscribe();
+            @Cleanup
+            Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                    .topic(topicName)
+                    .subscriptionName(subName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscribe();
+
+            admin.topics().deleteSubscription(topicName, subName, true);
+        } finally {
+            pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+        }
+    }
+
     @Test
     public void testUpdatePropertiesOnNonDurableSub() throws Exception {
         String topic = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testUpdatePropertiesOnNonDurableSub";