You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/20 06:29:12 UTC

[GitHub] [pulsar] Jason918 commented on a change in pull request #14759: [enh][broker] Support dynamic update between non-zero values of brokerPublisherThrottlingTickTimeMillis

Jason918 commented on a change in pull request #14759:
URL: https://github.com/apache/pulsar/pull/14759#discussion_r830566204



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -638,40 +638,66 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
      * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also
      * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it.
      */
-    public synchronized void setupBrokerPublishRateLimiterMonitor() {
+    public void setupBrokerPublishRateLimiterMonitor() {
         // set broker PublishRateLimiterMonitor
         long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
         if (brokerTickTimeMs > 0) {
-            if (this.brokerPublishRateLimiterMonitor == null) {
-                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
-                    new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
-                // schedule task that sums up publish-rate across all cnx on a topic,
-                // and check the rate limit exceeded or not.
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> checkBrokerPublishThrottlingRate()),
-                    brokerTickTimeMs,
-                    brokerTickTimeMs,
-                    TimeUnit.MILLISECONDS);
-                // schedule task that refreshes rate-limiting bucket
-                brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
-                    safeRun(() -> refreshBrokerPublishRate()),
-                    1,
-                    1,
-                    TimeUnit.SECONDS);
-            }
+            brokerPublishRateLimiterMonitor.startOrUpdate(brokerTickTimeMs,
+                    this::checkBrokerPublishThrottlingRate, this::refreshBrokerPublishRate);
         } else {
             // disable publish-throttling for broker.
-            if (this.brokerPublishRateLimiterMonitor != null) {
+            brokerPublishRateLimiterMonitor.stop();
+        }
+    }
+
+    protected static class PublishRateLimiterMonitor {
+        private final String name;
+        private ScheduledExecutorService scheduler = null;
+        private long tickTimeMs = 0;
+        private Runnable refreshTask;
+
+        public PublishRateLimiterMonitor(String name) {
+            this.name = name;
+        }
+
+        synchronized void startOrUpdate(long tickTimeMs, Runnable checkTask, Runnable refreshTask) {
+            if (this.scheduler != null) {
+                // we have old task running.
+                if (this.tickTimeMs == tickTimeMs) {
+                    // tick time not changed.
+                    return;
+                }
+                stop();

Review comment:
       Not the same, no need to restart if tickTime does not changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org