You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/19 16:48:19 UTC

[GitHub] [pulsar] Jason918 opened a new pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Jason918 opened a new pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759


   
   
   ### Motivation
   
   Currently, config of `brokerPublisherThrottlingTickTimeMillis` is marked as dynamic, but only support update between zero and non-zero. It does not support like increase the value from 1 to 1000.
   
   ### Modifications
   
   1. Add a class `PublishRateLimiterMonitor` to support tick interval updates.
   2. Removed synchronized for `setupBrokerPublishRateLimiterMonitor`, synchronized on the big `BrokerService` object is not necessary and risky.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - testPublishRateLimiterMonitor
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
     
   - [x] `no-need-doc` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#issuecomment-1073214409


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#issuecomment-1073780570


   @codelipenghui PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#discussion_r830566204



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -638,40 +638,66 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
      * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also
      * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it.
      */
-    public synchronized void setupBrokerPublishRateLimiterMonitor() {
+    public void setupBrokerPublishRateLimiterMonitor() {
         // set broker PublishRateLimiterMonitor
         long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
         if (brokerTickTimeMs > 0) {
-            if (this.brokerPublishRateLimiterMonitor == null) {
-                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
-                    new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
-                // schedule task that sums up publish-rate across all cnx on a topic,
-                // and check the rate limit exceeded or not.
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> checkBrokerPublishThrottlingRate()),
-                    brokerTickTimeMs,
-                    brokerTickTimeMs,
-                    TimeUnit.MILLISECONDS);
-                // schedule task that refreshes rate-limiting bucket
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> refreshBrokerPublishRate()),
-                    1,
-                    1,
-                    TimeUnit.SECONDS);
-            }
+            brokerPublishRateLimiterMonitor.startOrUpdate(brokerTickTimeMs,
+                    this::checkBrokerPublishThrottlingRate, this::refreshBrokerPublishRate);
         } else {
             // disable publish-throttling for broker.
-            if (this.brokerPublishRateLimiterMonitor != null) {
+            brokerPublishRateLimiterMonitor.stop();
+        }
+    }
+
+    protected static class PublishRateLimiterMonitor {
+        private final String name;
+        private ScheduledExecutorService scheduler = null;
+        private long tickTimeMs = 0;
+        private Runnable refreshTask;
+
+        public PublishRateLimiterMonitor(String name) {
+            this.name = name;
+        }
+
+        synchronized void startOrUpdate(long tickTimeMs, Runnable checkTask, Runnable refreshTask) {
+            if (this.scheduler != null) {
+                // we have old task running.
+                if (this.tickTimeMs == tickTimeMs) {
+                    // tick time not changed.
+                    return;
+                }
+                stop();

Review comment:
       Not the same, no need to restart if tickTime does not changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#discussion_r830549189



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -638,40 +638,66 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
      * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also
      * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it.
      */
-    public synchronized void setupBrokerPublishRateLimiterMonitor() {
+    public void setupBrokerPublishRateLimiterMonitor() {
         // set broker PublishRateLimiterMonitor
         long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
         if (brokerTickTimeMs > 0) {
-            if (this.brokerPublishRateLimiterMonitor == null) {
-                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
-                    new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
-                // schedule task that sums up publish-rate across all cnx on a topic,
-                // and check the rate limit exceeded or not.
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> checkBrokerPublishThrottlingRate()),
-                    brokerTickTimeMs,
-                    brokerTickTimeMs,
-                    TimeUnit.MILLISECONDS);
-                // schedule task that refreshes rate-limiting bucket
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> refreshBrokerPublishRate()),
-                    1,
-                    1,
-                    TimeUnit.SECONDS);
-            }
+            brokerPublishRateLimiterMonitor.startOrUpdate(brokerTickTimeMs,
+                    this::checkBrokerPublishThrottlingRate, this::refreshBrokerPublishRate);
         } else {
             // disable publish-throttling for broker.
-            if (this.brokerPublishRateLimiterMonitor != null) {
+            brokerPublishRateLimiterMonitor.stop();
+        }
+    }
+
+    protected static class PublishRateLimiterMonitor {
+        private final String name;
+        private ScheduledExecutorService scheduler = null;
+        private long tickTimeMs = 0;
+        private Runnable refreshTask;
+
+        public PublishRateLimiterMonitor(String name) {
+            this.name = name;
+        }
+
+        synchronized void startOrUpdate(long tickTimeMs, Runnable checkTask, Runnable refreshTask) {
+            if (this.scheduler != null) {
+                // we have old task running.
+                if (this.tickTimeMs == tickTimeMs) {
+                    // tick time not changed.
+                    return;
+                }
+                stop();

Review comment:
       if (this.scheduler != null && this.tickTimeMs != tickTimeMs) {
             stop();
   }
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#issuecomment-1073424762


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#issuecomment-1073757674


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#discussion_r830733804



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -638,40 +639,62 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
      * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also
      * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it.
      */
-    public synchronized void setupBrokerPublishRateLimiterMonitor() {
+    public void setupBrokerPublishRateLimiterMonitor() {
         // set broker PublishRateLimiterMonitor
         long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
         if (brokerTickTimeMs > 0) {
-            if (this.brokerPublishRateLimiterMonitor == null) {
-                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
-                    new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
-                // schedule task that sums up publish-rate across all cnx on a topic,
-                // and check the rate limit exceeded or not.
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> checkBrokerPublishThrottlingRate()),
-                    brokerTickTimeMs,
-                    brokerTickTimeMs,
-                    TimeUnit.MILLISECONDS);
-                // schedule task that refreshes rate-limiting bucket
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> refreshBrokerPublishRate()),
-                    1,
-                    1,
-                    TimeUnit.SECONDS);
-            }
+            brokerPublishRateLimiterMonitor.startOrUpdate(brokerTickTimeMs,
+                    this::checkBrokerPublishThrottlingRate, this::refreshBrokerPublishRate);
         } else {
             // disable publish-throttling for broker.
-            if (this.brokerPublishRateLimiterMonitor != null) {
-                try {
-                    this.brokerPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    log.warn("failed to shutdown brokerPublishRateLimiterMonitor", e);
+            brokerPublishRateLimiterMonitor.stop();
+        }
+    }
+
+    protected static class PublishRateLimiterMonitor {
+        private final String name;
+        private ScheduledExecutorService scheduler = null;
+        private long tickTimeMs = 0;
+        private Runnable refreshTask;
+
+        public PublishRateLimiterMonitor(String name) {
+            this.name = name;
+        }
+
+        synchronized void startOrUpdate(long tickTimeMs, Runnable checkTask, Runnable refreshTask) {
+            if (this.scheduler != null) {
+                // we have old task running.
+                if (this.tickTimeMs == tickTimeMs) {
+                    // tick time not changed.
+                    return;
                 }
+                stop();
+            }
+            //start monitor.
+            scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(name));
+            // schedule task that sums up publish-rate across all cnx on a topic ,
+            // and check the rate limit exceeded or not.
+            scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs, tickTimeMs, TimeUnit.MILLISECONDS);
+            // schedule task that refreshes rate-limiting bucket
+            scheduler.scheduleAtFixedRate(safeRun(refreshTask), 1, 1, TimeUnit.SECONDS);
+            this.tickTimeMs = tickTimeMs;
+            this.refreshTask = refreshTask;
+        }
+
+        synchronized void stop() {

Review comment:
       Yes, this remains the same as previous logic, the inner scheduler will be shutdown, see Line 837.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#issuecomment-1073178046


   > Overall looks good to me. Currently, the test only covers `PublishRateLimiterMonitor `. Could we add a test to verify dynamic update values of `brokerPublisherThrottlingTickTimeMillis` using the BrokerService to make sure the BrokerService works fine?
   
   @RobertIndie 
   Added unit test org.apache.pulsar.broker.service.BrokerServiceTest#testDynamicBrokerPublisherThrottlingTickTimeMillis PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#discussion_r830730682



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -638,40 +639,62 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
      * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also
      * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it.
      */
-    public synchronized void setupBrokerPublishRateLimiterMonitor() {
+    public void setupBrokerPublishRateLimiterMonitor() {
         // set broker PublishRateLimiterMonitor
         long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
         if (brokerTickTimeMs > 0) {
-            if (this.brokerPublishRateLimiterMonitor == null) {
-                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
-                    new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
-                // schedule task that sums up publish-rate across all cnx on a topic,
-                // and check the rate limit exceeded or not.
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> checkBrokerPublishThrottlingRate()),
-                    brokerTickTimeMs,
-                    brokerTickTimeMs,
-                    TimeUnit.MILLISECONDS);
-                // schedule task that refreshes rate-limiting bucket
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> refreshBrokerPublishRate()),
-                    1,
-                    1,
-                    TimeUnit.SECONDS);
-            }
+            brokerPublishRateLimiterMonitor.startOrUpdate(brokerTickTimeMs,
+                    this::checkBrokerPublishThrottlingRate, this::refreshBrokerPublishRate);
         } else {
             // disable publish-throttling for broker.
-            if (this.brokerPublishRateLimiterMonitor != null) {
-                try {
-                    this.brokerPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    log.warn("failed to shutdown brokerPublishRateLimiterMonitor", e);
+            brokerPublishRateLimiterMonitor.stop();
+        }
+    }
+
+    protected static class PublishRateLimiterMonitor {
+        private final String name;
+        private ScheduledExecutorService scheduler = null;
+        private long tickTimeMs = 0;
+        private Runnable refreshTask;
+
+        public PublishRateLimiterMonitor(String name) {
+            this.name = name;
+        }
+
+        synchronized void startOrUpdate(long tickTimeMs, Runnable checkTask, Runnable refreshTask) {
+            if (this.scheduler != null) {
+                // we have old task running.
+                if (this.tickTimeMs == tickTimeMs) {
+                    // tick time not changed.
+                    return;
                 }
+                stop();
+            }
+            //start monitor.
+            scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(name));
+            // schedule task that sums up publish-rate across all cnx on a topic ,
+            // and check the rate limit exceeded or not.
+            scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs, tickTimeMs, TimeUnit.MILLISECONDS);
+            // schedule task that refreshes rate-limiting bucket
+            scheduler.scheduleAtFixedRate(safeRun(refreshTask), 1, 1, TimeUnit.SECONDS);
+            this.tickTimeMs = tickTimeMs;
+            this.refreshTask = refreshTask;
+        }
+
+        synchronized void stop() {

Review comment:
       Shutdown the broker also need to stop the monitor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org