You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 05:48:03 UTC

[pulsar] branch branch-2.10 updated: [branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new f77696ddf32 [branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)
f77696ddf32 is described below

commit f77696ddf32605f2c2e5398e1156099bcd1d3e70
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Aug 10 13:47:56 2022 +0800

    [branch-2.10][cherry-pick] Fix topic dispatch rate limiter not init on broker-level #16084 (#17000)
---
 .../pulsar/broker/service/AbstractTopic.java       |   3 +
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../broker/service/persistent/PersistentTopic.java |  28 ++--
 .../service/TopicDispatchRateLimiterTest.java      | 145 +++++++++++++++++++++
 4 files changed, 157 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f9efa8ea186..ebd1ed2c79d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -845,6 +845,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         }
     }
 
+    public void updateDispatchRateLimiter() {
+    }
+
     @Override
     public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
         // topic rate not exceeded, and completed broker limiter reset.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dab1c349aed..5457b464488 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2321,9 +2321,7 @@ public class BrokerService implements Closeable {
             forEachTopic(topic -> {
                 if (topic instanceof AbstractTopic) {
                     ((AbstractTopic) topic).updateBrokerDispatchRate();
-                }
-                if (topic.getDispatchRateLimiter().isPresent()) {
-                    topic.getDispatchRateLimiter().get().updateDispatchRate();
+                    ((AbstractTopic) topic).updateDispatchRateLimiter();
                 }
             });
         });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 51c27bfd8f9..085617214b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2367,6 +2367,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
     }
 
+    @Override
+    public void updateDispatchRateLimiter() {
+        initializeDispatchRateLimiterIfNeeded();
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
+    }
+
     @Override
     public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
         if (log.isDebugEnabled()) {
@@ -2386,7 +2392,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         schemaValidationEnforced = data.schema_validation_enforced;
 
-        initializeDispatchRateLimiterIfNeeded();
+        updateDispatchRateLimiter();
 
         updateSubscribeRateLimiter();
 
@@ -2405,8 +2411,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
                 CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
                 CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
-                // update rate-limiter if policies updated
-                dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
                 if (this.subscribeRateLimiter.isPresent()) {
                     subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
                 }
@@ -3026,11 +3030,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
         updateTopicPolicy(policies);
 
-        Optional<Policies> namespacePolicies = getNamespacePolicies();
-        initializeTopicDispatchRateLimiterIfNeeded(policies);
-
-        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
-
+        updateDispatchRateLimiter();
         updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
             updatePublishDispatcher();
             initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
@@ -3057,10 +3057,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         });
     }
 
-    private Optional<Policies> getNamespacePolicies() {
-        return DispatchRateLimiter.getPolicies(brokerService, topic);
-    }
-
     private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
         List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
         subscriptions.forEach((subName, sub) -> {
@@ -3076,14 +3072,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return FutureUtil.waitForAll(subscriptionCheckFutures);
     }
 
-    private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
-        synchronized (dispatchRateLimiterLock) {
-            if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
-                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
-            }
-        }
-    }
-
     private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
         if (!policies.isPresent()) {
             return;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java
new file mode 100644
index 00000000000..d70a21cb550
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicDispatchRateLimiterTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TopicDispatchRateLimiterTest extends BrokerTestBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setDispatchThrottlingRatePerTopicInMsg(0);
+        conf.setDispatchThrottlingRatePerTopicInByte(0L);
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testTopicDispatchRateLimiterPerTopicInMsgOnlyBrokerLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+        assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", "100");
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerTopicInMsg(), 100));
+
+        Awaitility.await().untilAsserted(() ->
+            assertTrue(topic.getDispatchRateLimiter().isPresent()));
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100L);
+    }
+
+    @Test
+    public void testTopicDispatchRateLimiterPerTopicInByteOnlyBrokerLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+        assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", "1000");
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerTopicInByte(), 1000L));
+
+        Awaitility.await().untilAsserted(() ->
+            assertTrue(topic.getDispatchRateLimiter().isPresent()));
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+    }
+
+    @Test
+    public void testTopicDispatchRateLimiterOnlyNamespaceLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+        assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+        DispatchRate dispatchRate = DispatchRate
+            .builder()
+            .dispatchThrottlingRateInMsg(100)
+            .dispatchThrottlingRateInByte(1000L)
+            .build();
+        admin.namespaces().setDispatchRate("prop/ns-abc", dispatchRate);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.namespaces().getDispatchRate("prop/ns-abc"));
+            assertEquals(admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInMsg(), 100);
+            assertEquals(admin.namespaces().getDispatchRate("prop/ns-abc").getDispatchThrottlingRateInByte(), 1000L);
+        });
+
+        Awaitility.await().untilAsserted(() -> assertTrue(topic.getDispatchRateLimiter().isPresent()));
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100);
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+    }
+
+    @Test
+    public void testTopicDispatchRateLimiterOnlyTopicLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+        assertFalse(topic.getDispatchRateLimiter().isPresent());
+
+        DispatchRate dispatchRate = DispatchRate
+            .builder()
+            .dispatchThrottlingRateInMsg(100)
+            .dispatchThrottlingRateInByte(1000L)
+            .build();
+        admin.topicPolicies().setDispatchRate(topicName, dispatchRate);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.topicPolicies().getDispatchRate(topicName));
+            assertEquals(admin.topicPolicies().getDispatchRate(topicName).getDispatchThrottlingRateInMsg(), 100);
+            assertEquals(admin.topicPolicies().getDispatchRate(topicName).getDispatchThrottlingRateInByte(), 1000L);
+        });
+
+        Awaitility.await().untilAsserted(() ->  assertTrue(topic.getDispatchRateLimiter().isPresent()));
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg(), 100);
+        assertEquals(topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte(), 1000L);
+    }
+}