You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:32 UTC

[pulsar] 03/11: [Broker] Remove subscription when closing Reader on non-persistent topics (#11731)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2fb2e59cde34b00b28a946cab6625be8e93fe729
Author: ran <ga...@126.com>
AuthorDate: Sat Aug 21 02:51:41 2021 +0800

    [Broker] Remove subscription when closing Reader on non-persistent topics (#11731)
    
    * Remove the subscription from the topic when closing Reader subscription.
    
    * remove useless code
    
    (cherry picked from commit 5ac38f1434dd9822daa4bcd1ed2f3c5e4580463b)
---
 .../nonpersistent/NonPersistentSubscription.java   |  9 +++-
 .../service/nonpersistent/NonPersistentTopic.java  |  4 +-
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 48 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c605f23..a400fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -72,13 +72,17 @@ public class NonPersistentSubscription implements Subscription {
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
-    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
+    // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
+    private final boolean isDurable;
+
+    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
         this.topic = topic;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
         IS_FENCED_UPDATER.set(this, FALSE);
         this.lastActive = System.currentTimeMillis();
+        this.isDurable = isDurable;
     }
 
     @Override
@@ -200,6 +204,9 @@ public class NonPersistentSubscription implements Subscription {
         ConsumerStatsImpl stats = consumer.getStats();
         bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
         msgOutFromRemovedConsumer.add(stats.msgOutCounter);
+        if (!isDurable) {
+            topic.unsubscribe(subName);
+        }
 
         // invalid consumer remove will throw an exception
         // decrement usage is triggered only for valid consumer close
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 37680d4..7ae87f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -270,7 +270,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         }
 
         NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
-                name -> new NonPersistentSubscription(this, subscriptionName));
+                name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
         Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
                 cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
         addConsumerToSubscription(subscription, consumer).thenRun(() -> {
@@ -319,7 +319,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     @Override
     public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
             boolean replicateSubscriptionState) {
-        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
+        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 4a2466f..2052646 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
@@ -554,4 +556,50 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         latch.await();
         Assert.assertEquals(received.size(), 1);
     }
+
+    @Test(timeOut = 1000 * 10)
+    public void removeNonPersistentTopicReaderTest() throws Exception {
+        final String topic = "non-persistent://my-property/my-ns/non-topic";
+
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        Reader<byte[]> reader2 = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        Awaitility.await()
+                .pollDelay(3, TimeUnit.SECONDS)
+                .until(() -> {
+                    TopicStats topicStats = admin.topics().getStats(topic);
+                    System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+                    return topicStats.getSubscriptions().size() == 2;
+                });
+
+        reader.close();
+        reader2.close();
+
+        Awaitility.await().until(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic);
+            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+            return topicStats.getSubscriptions().size() == 0;
+        });
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        consumer.close();
+
+        Awaitility.await()
+                .pollDelay(3, TimeUnit.SECONDS)
+                .until(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic);
+            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+            return topicStats.getSubscriptions().size() == 1;
+        });
+    }
+
 }