You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:32 UTC
[pulsar] 03/11: [Broker] Remove subscription when closing Reader on
non-persistent topics (#11731)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2fb2e59cde34b00b28a946cab6625be8e93fe729
Author: ran <ga...@126.com>
AuthorDate: Sat Aug 21 02:51:41 2021 +0800
[Broker] Remove subscription when closing Reader on non-persistent topics (#11731)
* Remove the subscription from the topic when closing Reader subscription.
* remove useless code
(cherry picked from commit 5ac38f1434dd9822daa4bcd1ed2f3c5e4580463b)
---
.../nonpersistent/NonPersistentSubscription.java | 9 +++-
.../service/nonpersistent/NonPersistentTopic.java | 4 +-
.../org/apache/pulsar/client/impl/ReaderTest.java | 48 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c605f23..a400fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -72,13 +72,17 @@ public class NonPersistentSubscription implements Subscription {
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
- public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
+ // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
+ private final boolean isDurable;
+
+ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
+ this.isDurable = isDurable;
}
@Override
@@ -200,6 +204,9 @@ public class NonPersistentSubscription implements Subscription {
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
+ if (!isDurable) {
+ topic.unsubscribe(subName);
+ }
// invalid consumer remove will throw an exception
// decrement usage is triggered only for valid consumer close
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 37680d4..7ae87f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -270,7 +270,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
- name -> new NonPersistentSubscription(this, subscriptionName));
+ name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
@@ -319,7 +319,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
- return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
+ return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 4a2466f..2052646 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
@@ -554,4 +556,50 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
latch.await();
Assert.assertEquals(received.size(), 1);
}
+
+ @Test(timeOut = 1000 * 10)
+ public void removeNonPersistentTopicReaderTest() throws Exception {
+ final String topic = "non-persistent://my-property/my-ns/non-topic";
+
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+ Reader<byte[]> reader2 = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ Awaitility.await()
+ .pollDelay(3, TimeUnit.SECONDS)
+ .until(() -> {
+ TopicStats topicStats = admin.topics().getStats(topic);
+ System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+ return topicStats.getSubscriptions().size() == 2;
+ });
+
+ reader.close();
+ reader2.close();
+
+ Awaitility.await().until(() -> {
+ TopicStats topicStats = admin.topics().getStats(topic);
+ System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+ return topicStats.getSubscriptions().size() == 0;
+ });
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe();
+ consumer.close();
+
+ Awaitility.await()
+ .pollDelay(3, TimeUnit.SECONDS)
+ .until(() -> {
+ TopicStats topicStats = admin.topics().getStats(topic);
+ System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+ return topicStats.getSubscriptions().size() == 1;
+ });
+ }
+
}