You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/20 14:51:39 UTC

[GitHub] [pulsar] AnonHxy opened a new pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

AnonHxy opened a new pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387


   ### Motivation
   
   Dynamic update topic broker-level publish-rate.
   
   ### Modifications
   
   Invoke `org.apache.pulsar.broker.service.BrokerService#registerConfigurationListener` to register   listeners 
    about `maxPublishRatePerTopicInMessages` and `maxPublishRatePerTopicInBytes` 
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
     
   - [x] `no-need-doc` 
    
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#issuecomment-1048446861


   +1 please add test for the new changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r829773714



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting")
+            .producerName("producer-name")
+            .create();
+        testPublishRate(producer, 30, 20, 10, 10);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, 20, -1, 20);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, -1, -1, 30);
+        Thread.sleep(1000);
+        testPublishRate(producer, -1, -1, 10, 10);
+        super.internalCleanup();
+    }
+
+    private void testPublishRate(org.apache.pulsar.client.api.Producer<byte[]> producer, int brokerRate, int nsRate,
+                                 int topicRate, int expectedRate) throws Exception {
+        PublishRate topicPublishRate = new PublishRate(1, topicRate);
+        PublishRate nsPublishRate = new PublishRate(1, nsRate);
+        final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
+        admin.topicPolicies().setPublishRate(topic, topicPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(topic), topicPublishRate));
+        admin.namespaces().setPublishRate("prop/ns-abc", nsPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPublishRate("prop/ns-abc"), nsPublishRate));
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + brokerRate);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + brokerRate));
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        try {
+            // first will be success, and will set auto read to false
+            messageId = producer.sendAsync(new byte[expectedRate]).get(500, TimeUnit.MILLISECONDS);

Review comment:
       Have updated the UT, PTAL @codelipenghui @Jason918 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14387: [feat][broker]: support dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#issuecomment-1072050634


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r829641873



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting")
+            .producerName("producer-name")
+            .create();
+        testPublishRate(producer, 30, 20, 10, 10);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, 20, -1, 20);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, -1, -1, 30);
+        Thread.sleep(1000);
+        testPublishRate(producer, -1, -1, 10, 10);
+        super.internalCleanup();
+    }
+
+    private void testPublishRate(org.apache.pulsar.client.api.Producer<byte[]> producer, int brokerRate, int nsRate,
+                                 int topicRate, int expectedRate) throws Exception {
+        PublishRate topicPublishRate = new PublishRate(1, topicRate);
+        PublishRate nsPublishRate = new PublishRate(1, nsRate);
+        final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
+        admin.topicPolicies().setPublishRate(topic, topicPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(topic), topicPublishRate));
+        admin.namespaces().setPublishRate("prop/ns-abc", nsPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPublishRate("prop/ns-abc"), nsPublishRate));
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + brokerRate);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + brokerRate));
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        try {
+            // first will be success, and will set auto read to false
+            messageId = producer.sendAsync(new byte[expectedRate]).get(500, TimeUnit.MILLISECONDS);

Review comment:
       > I think we only need to check the dynamic update is been applied, do not need to check if the rate limier is works or not?
   
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wangjialing218 commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
wangjialing218 commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r810728537



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -1118,4 +1118,9 @@ public void updateBrokerSubscriptionDispatchRate() {
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
             subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerPublishRate() {
+        topicPolicies.getPublishRate().updateBrokerValue(
+            publishRateInBroker(brokerService.pulsar().getConfiguration()));

Review comment:
       When broker level topic publish rate is updated, all topic's publish rate is updated. 
   Only those topic which topic level publish rate is null should be updated in this case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#issuecomment-1058927450


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#issuecomment-1072050634


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r829641873



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting")
+            .producerName("producer-name")
+            .create();
+        testPublishRate(producer, 30, 20, 10, 10);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, 20, -1, 20);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, -1, -1, 30);
+        Thread.sleep(1000);
+        testPublishRate(producer, -1, -1, 10, 10);
+        super.internalCleanup();
+    }
+
+    private void testPublishRate(org.apache.pulsar.client.api.Producer<byte[]> producer, int brokerRate, int nsRate,
+                                 int topicRate, int expectedRate) throws Exception {
+        PublishRate topicPublishRate = new PublishRate(1, topicRate);
+        PublishRate nsPublishRate = new PublishRate(1, nsRate);
+        final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
+        admin.topicPolicies().setPublishRate(topic, topicPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(topic), topicPublishRate));
+        admin.namespaces().setPublishRate("prop/ns-abc", nsPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPublishRate("prop/ns-abc"), nsPublishRate));
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + brokerRate);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + brokerRate));
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        try {
+            // first will be success, and will set auto read to false
+            messageId = producer.sendAsync(new byte[expectedRate]).get(500, TimeUnit.MILLISECONDS);

Review comment:
       > I think we only need to check the dynamic update is been applied, do not need to check if the rate limier is works or not?
   
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#issuecomment-1046408962


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r829773714



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting")
+            .producerName("producer-name")
+            .create();
+        testPublishRate(producer, 30, 20, 10, 10);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, 20, -1, 20);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, -1, -1, 30);
+        Thread.sleep(1000);
+        testPublishRate(producer, -1, -1, 10, 10);
+        super.internalCleanup();
+    }
+
+    private void testPublishRate(org.apache.pulsar.client.api.Producer<byte[]> producer, int brokerRate, int nsRate,
+                                 int topicRate, int expectedRate) throws Exception {
+        PublishRate topicPublishRate = new PublishRate(1, topicRate);
+        PublishRate nsPublishRate = new PublishRate(1, nsRate);
+        final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
+        admin.topicPolicies().setPublishRate(topic, topicPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(topic), topicPublishRate));
+        admin.namespaces().setPublishRate("prop/ns-abc", nsPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPublishRate("prop/ns-abc"), nsPublishRate));
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + brokerRate);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + brokerRate));
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        try {
+            // first will be success, and will set auto read to false
+            messageId = producer.sendAsync(new byte[expectedRate]).get(500, TimeUnit.MILLISECONDS);

Review comment:
       Have updated the UT, PTAL @codelipenghui @Jason918 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] AnonHxy commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r810735645



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -1118,4 +1118,9 @@ public void updateBrokerSubscriptionDispatchRate() {
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
             subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerPublishRate() {
+        topicPolicies.getPublishRate().updateBrokerValue(
+            publishRateInBroker(brokerService.pulsar().getConfiguration()));

Review comment:
       Yes. But we use the `topicPolicies` is a `HierarchyTopicPolicies` type, which will update value through topic-namespace-broker hierarchy. For details : `org.apache.pulsar.common.policies.data.PolicyHierarchyValue#updateBrokerValue` @wangjialing218 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r810735507



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -1118,4 +1118,9 @@ public void updateBrokerSubscriptionDispatchRate() {
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
             subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerPublishRate() {
+        topicPolicies.getPublishRate().updateBrokerValue(
+            publishRateInBroker(brokerService.pulsar().getConfiguration()));

Review comment:
       We don't need to check other level settings with `HierarchyTopicPolicies`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #14387: [Broker]Dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387#discussion_r826788530



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting")
+            .producerName("producer-name")
+            .create();
+        testPublishRate(producer, 30, 20, 10, 10);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, 20, -1, 20);
+        Thread.sleep(1000);
+        testPublishRate(producer, 30, -1, -1, 30);
+        Thread.sleep(1000);
+        testPublishRate(producer, -1, -1, 10, 10);
+        super.internalCleanup();
+    }
+
+    private void testPublishRate(org.apache.pulsar.client.api.Producer<byte[]> producer, int brokerRate, int nsRate,
+                                 int topicRate, int expectedRate) throws Exception {
+        PublishRate topicPublishRate = new PublishRate(1, topicRate);
+        PublishRate nsPublishRate = new PublishRate(1, nsRate);
+        final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
+        admin.topicPolicies().setPublishRate(topic, topicPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(topic), topicPublishRate));
+        admin.namespaces().setPublishRate("prop/ns-abc", nsPublishRate);
+        Awaitility.await()
+            .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPublishRate("prop/ns-abc"), nsPublishRate));
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + brokerRate);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + brokerRate));
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        try {
+            // first will be success, and will set auto read to false
+            messageId = producer.sendAsync(new byte[expectedRate]).get(500, TimeUnit.MILLISECONDS);

Review comment:
       I think we only need to check the dynamic update is been applied, do not need to check if the rate limier is works or not? because the rate limiter is tested by other tests.
   
   So that we don't introduce `Thread.sleep()` in this test, which might introduce new flaky tests.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
##########
@@ -140,4 +141,64 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testMultiLevelPublishRate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()

Review comment:
       Close the producer after test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14387: [feat][broker]: support dynamic update topic broker-level publish-rate

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14387:
URL: https://github.com/apache/pulsar/pull/14387


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org