You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/07 10:52:19 UTC
[pulsar] branch master updated: [broker]Fix subscribeRateLimiter not close (#15457)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a6b0ce57cd [broker]Fix subscribeRateLimiter not close (#15457)
2a6b0ce57cd is described below
commit 2a6b0ce57cde5825fbc74c496722fca7e394cb08
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 7 18:52:13 2022 +0800
[broker]Fix subscribeRateLimiter not close (#15457)
---
.../broker/service/persistent/PersistentTopic.java | 27 +++++----
.../service/persistent/SubscribeRateLimiter.java | 3 +
.../pulsar/broker/service/SubscribeRateTest.java | 65 +++++++++++++++++++---
3 files changed, 77 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a515c883569..27e3f1b0345 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -382,11 +382,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
- if (SubscribeRateLimiter.isSubscribeRateEnabled(getSubscribeRate())) {
- this.subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
- } else {
- this.subscribeRateLimiter = Optional.empty();
- }
+ updateSubscribeRateLimiter();
// dispatch rate limiter for each subscription
subscriptions.forEach((name, subscription) -> {
@@ -446,13 +442,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
public void updateSubscribeRateLimiter() {
- SubscribeRate subscribeRate = this.getSubscribeRate();
- if (isSubscribeRateEnabled(subscribeRate)) {
- subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
- } else {
- subscribeRateLimiter = Optional.empty();
+ SubscribeRate subscribeRate = getSubscribeRate();
+ synchronized (subscribeRateLimiter) {
+ if (isSubscribeRateEnabled(subscribeRate)) {
+ if (subscribeRateLimiter.isPresent()) {
+ this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
+ } else {
+ this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+ }
+ } else {
+ if (subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.get().close();
+ subscribeRateLimiter = Optional.empty();
+ }
+ }
}
- subscribeRateLimiter.ifPresent(limiter -> limiter.onSubscribeRateUpdate(subscribeRate));
}
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
@@ -3090,6 +3094,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
} else if (!policies.get().isSubscribeRateSet()
|| policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+ subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
this.subscribeRateLimiter = Optional.empty();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 771e34a4fd6..89af6f6be88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -140,6 +140,9 @@ public class SubscribeRateLimiter {
}
public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
+ if (this.subscribeRate.equals(subscribeRate)) {
+ return;
+ }
this.subscribeRate = subscribeRate;
stopResetTask();
for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
index 76399f32f7d..547fbe354f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -18,31 +18,39 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class SubscribeRateTest extends BrokerTestBase {
-
+ @BeforeMethod
@Override
protected void setup() throws Exception {
- //No-op
+ super.baseSetup();
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
}
+ @AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
- //No-op
+ super.internalCleanup();
}
@Test
public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
- conf.setTopicLevelPoliciesEnabled(true);
- conf.setSystemTopicEnabled(true);
- conf.setMaxPendingPublishRequestsPerConnection(0);
- super.baseSetup();
final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -77,4 +85,47 @@ public class SubscribeRateTest extends BrokerTestBase {
producer.close();
}
+
+ @Test
+ public void testUpdateSubscribeRateLimiter() throws Exception {
+
+ final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
+
+ Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ Assert.assertTrue(topicRef instanceof PersistentTopic);
+ Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+ // init
+ PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef));
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter1 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertTrue(limiter1.isPresent());
+ Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60));
+
+ // update
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter2 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertTrue(limiter2.isPresent());
+ Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120));
+
+ Assert.assertSame(limiter1, limiter2);
+
+ // disable
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter3 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertFalse(limiter3.isPresent());
+ }
}