You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/07 10:52:19 UTC

[pulsar] branch master updated: [broker]Fix subscribeRateLimiter not close (#15457)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a6b0ce57cd [broker]Fix subscribeRateLimiter not close (#15457)
2a6b0ce57cd is described below

commit 2a6b0ce57cde5825fbc74c496722fca7e394cb08
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 7 18:52:13 2022 +0800

    [broker]Fix subscribeRateLimiter not close (#15457)
---
 .../broker/service/persistent/PersistentTopic.java | 27 +++++----
 .../service/persistent/SubscribeRateLimiter.java   |  3 +
 .../pulsar/broker/service/SubscribeRateTest.java   | 65 +++++++++++++++++++---
 3 files changed, 77 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a515c883569..27e3f1b0345 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -382,11 +382,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }
 
-            if (SubscribeRateLimiter.isSubscribeRateEnabled(getSubscribeRate())) {
-                this.subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
-            } else {
-                this.subscribeRateLimiter = Optional.empty();
-            }
+            updateSubscribeRateLimiter();
 
             // dispatch rate limiter for each subscription
             subscriptions.forEach((name, subscription) -> {
@@ -446,13 +442,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     public void updateSubscribeRateLimiter() {
-        SubscribeRate subscribeRate = this.getSubscribeRate();
-        if (isSubscribeRateEnabled(subscribeRate)) {
-            subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
-        } else {
-            subscribeRateLimiter = Optional.empty();
+        SubscribeRate subscribeRate = getSubscribeRate();
+        synchronized (subscribeRateLimiter) {
+            if (isSubscribeRateEnabled(subscribeRate)) {
+                if (subscribeRateLimiter.isPresent()) {
+                    this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
+                } else {
+                    this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+                }
+            } else {
+                if (subscribeRateLimiter.isPresent()) {
+                    subscribeRateLimiter.get().close();
+                    subscribeRateLimiter = Optional.empty();
+                }
+            }
         }
-        subscribeRateLimiter.ifPresent(limiter -> limiter.onSubscribeRateUpdate(subscribeRate));
     }
 
     private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
@@ -3090,6 +3094,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
             } else if (!policies.get().isSubscribeRateSet()
                     || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+                subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
                 this.subscribeRateLimiter = Optional.empty();
             }
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 771e34a4fd6..89af6f6be88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -140,6 +140,9 @@ public class SubscribeRateLimiter {
     }
 
     public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
+        if (this.subscribeRate.equals(subscribeRate)) {
+            return;
+        }
         this.subscribeRate = subscribeRate;
         stopResetTask();
         for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
index 76399f32f7d..547fbe354f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -18,31 +18,39 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class SubscribeRateTest extends BrokerTestBase {
-
+    @BeforeMethod
     @Override
     protected void setup() throws Exception {
-        //No-op
+        super.baseSetup();
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
     }
 
+    @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        //No-op
+        super.internalCleanup();
     }
 
     @Test
     public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
-        conf.setTopicLevelPoliciesEnabled(true);
-        conf.setSystemTopicEnabled(true);
-        conf.setMaxPendingPublishRequestsPerConnection(0);
-        super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -77,4 +85,47 @@ public class SubscribeRateTest extends BrokerTestBase {
 
         producer.close();
     }
+
+    @Test
+    public void testUpdateSubscribeRateLimiter() throws Exception {
+
+        final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("producer-name")
+            .create();
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        Assert.assertTrue(topicRef instanceof PersistentTopic);
+        Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+        // init
+        PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef));
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter1 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertTrue(limiter1.isPresent());
+        Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60));
+
+        // update
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter2 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertTrue(limiter2.isPresent());
+        Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120));
+
+        Assert.assertSame(limiter1, limiter2);
+
+        // disable
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter3 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertFalse(limiter3.isPresent());
+    }
 }