You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 05:48:03 UTC
[pulsar] branch branch-2.10 updated: [branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new f77696ddf32 [branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)
f77696ddf32 is described below
commit f77696ddf32605f2c2e5398e1156099bcd1d3e70
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Aug 10 13:47:56 2022 +0800
[branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)
---
.../pulsar/broker/service/AbstractTopic.java | 3 +
.../pulsar/broker/service/BrokerService.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 28 ++--
.../service/TopicDispatchRateLimiterTest.java | 145 +++++++++++++++++++++
4 files changed, 157 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f9efa8ea186..ebd1ed2c79d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -845,6 +845,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
}
}
+ public void updateDispatchRateLimiter() {
+ }
+
@Override
public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
// topic rate not exceeded, and completed broker limiter reset.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dab1c349aed..5457b464488 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2321,9 +2321,7 @@ public class BrokerService implements Closeable {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerDispatchRate();
- }
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().updateDispatchRate();
+ ((AbstractTopic) topic).updateDispatchRateLimiter();
}
});
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 51c27bfd8f9..085617214b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2367,6 +2367,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
}
+ @Override
+ public void updateDispatchRateLimiter() {
+ initializeDispatchRateLimiterIfNeeded();
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+ }
+
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
@@ -2386,7 +2392,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
schemaValidationEnforced = data.schema_validation_enforced;
- initializeDispatchRateLimiterIfNeeded();
+ updateDispatchRateLimiter();
updateSubscribeRateLimiter();
@@ -2405,8 +2411,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
- // update rate-limiter if policies updated
- dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
}
@@ -3026,11 +3030,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
updateTopicPolicy(policies);
- Optional<Policies> namespacePolicies = getNamespacePolicies();
- initializeTopicDispatchRateLimiterIfNeeded(policies);
-
- dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
-
+ updateDispatchRateLimiter();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
@@ -3057,10 +3057,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
});
}
- private Optional<Policies> getNamespacePolicies() {
- return DispatchRateLimiter.getPolicies(brokerService, topic);
- }
-
private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
subscriptions.forEach((subName, sub) -> {
@@ -3076,14 +3072,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.waitForAll(subscriptionCheckFutures);
}
- private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
- synchronized (dispatchRateLimiterLock) {
- if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
- this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
- }
- }
- }
-
private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
if (!policies.isPresent()) {
return;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java
new file mode 100644
index 00000000000..d70a21cb550
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TopicDispatchRateLimiterTest extends BrokerTestBase {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setDispatchThrottlingRatePerTopicInMsg(0);
+ conf.setDispatchThrottlingRatePerTopicInByte(0L);
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ super.baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testTopicDispatchRateLimiterPerTopicInMsgOnlyBrokerLevel() throws Exception {
+ final String topicName = "persistent://" + newTopicName();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ assertNotNull(topic);
+ assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", "100");
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerTopicInMsg(), 100));
+
+ Awaitility.await().untilAsserted(() ->
+ assertTrue(topic.getDispatchRateLimiter().isPresent()));
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100L);
+ }
+
+ @Test
+ public void testTopicDispatchRateLimiterPerTopicInByteOnlyBrokerLevel() throws Exception {
+ final String topicName = "persistent://" + newTopicName();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ assertNotNull(topic);
+ assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", "1000");
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerTopicInByte(), 1000L));
+
+ Awaitility.await().untilAsserted(() ->
+ assertTrue(topic.getDispatchRateLimiter().isPresent()));
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+ }
+
+ @Test
+ public void testTopicDispatchRateLimiterOnlyNamespaceLevel() throws Exception {
+ final String topicName = "persistent://" + newTopicName();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ assertNotNull(topic);
+ assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+ DispatchRate dispatchRate = DispatchRate
+ .builder()
+ .dispatchThrottlingRateInMsg(100)
+ .dispatchThrottlingRateInByte(1000L)
+ .build();
+ admin.namespaces().setDispatchRate("prop/ns-abc", dispatchRate);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(admin.namespaces().getDispatchRate("prop/ns-abc"));
+ assertEquals(admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInMsg(), 100);
+ assertEquals(admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInByte(), 1000L);
+ });
+
+ Awaitility.await().untilAsserted(() -> assertTrue(topic.getDispatchRateLimiter().isPresent()));
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100);
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+ }
+
+ @Test
+ public void testTopicDispatchRateLimiterOnlyTopicLevel() throws Exception {
+ final String topicName = "persistent://" + newTopicName();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ assertNotNull(topic);
+ assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+ DispatchRate dispatchRate = DispatchRate
+ .builder()
+ .dispatchThrottlingRateInMsg(100)
+ .dispatchThrottlingRateInByte(1000L)
+ .build();
+ admin.topicPolicies().setDispatchRate(topicName, dispatchRate);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(admin.topicPolicies().getDispatchRate(topicName));
+ assertEquals(admin.topicPolicies().getDispatchRate(topicName).getDispatchThrottlingRateInMsg(), 100);
+ assertEquals(admin.topicPolicies().getDispatchRate(topicName).getDispatchThrottlingRateInByte(), 1000L);
+ });
+
+ Awaitility.await().untilAsserted(() -> assertTrue(topic.getDispatchRateLimiter().isPresent()));
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100);
+ assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+ }
+}