You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/04/08 10:41:22 UTC
(pulsar) branch branch-3.2 updated: [admin][broker] Fix force delete subscription not working (#22423)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new ed58028e0bd [admin][broker] Fix force delete subscription not working (#22423)
ed58028e0bd is described below
commit ed58028e0bd0939b616d29ea31558fedcbc563d7
Author: 道君 <da...@apache.org>
AuthorDate: Thu Apr 4 23:08:45 2024 +0800
[admin][broker] Fix force delete subscription not working (#22423)
---
.../broker/admin/impl/PersistentTopicsBase.java | 5 ++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3eddddf8773..318e2bc2cde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1566,7 +1566,7 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
futures.add(adminClient.topics()
- .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+ .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
}
return FutureUtil.waitForAll(futures).handle((result, exception) -> {
@@ -1585,8 +1585,7 @@ public class PersistentTopicsBase extends AdminResource {
return null;
});
}
- return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
- force);
+ return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force);
});
}
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6641a5805c0..8e1375303ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -79,11 +79,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1793,6 +1795,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName));
}
+ @Test
+ public void testForceDeleteSubscription() throws Exception {
+ try {
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+ String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription";
+ String subName = "sub1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subName, MessageId.latest);
+
+ @Cleanup
+ Consumer<String> c0 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ @Cleanup
+ Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ admin.topics().deleteSubscription(topicName, subName, true);
+ } finally {
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ }
+ }
+
@Test
public void testUpdatePropertiesOnNonDurableSub() throws Exception {
String topic = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testUpdatePropertiesOnNonDurableSub";