You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/12/08 12:12:10 UTC

[pulsar] branch master updated: PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate (#5797)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02bf9a0  PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate (#5797)
02bf9a0 is described below

commit 02bf9a0b770e53f5a6f3810e9602ccc9a4c05050
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Dec 8 04:12:04 2019 -0800

    PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate (#5797)
    
    ### Motivation
    With [PIP-3](https://github.com/apache/pulsar/wiki/PIP-3:-Message-dispatch-throttling) , Pulsar broker already supports to configure dispatch rate-limiting for a given topic. Dispatch-throttling feature allows user to configure absolute dispatch rate based on current publish-rate for a given topic or subscriber, and broker will make sure to dispatch only configured number of messages to the consumers regardless current publish-rate or backlog on that topic.
    
    Current dispatch-rate limiting doesn't consider change in publish-rate so, increasing publish-rate on the topic might be larger than configured dispatch-rate which will cause backlog on the topic and consumers will never be able to catch up the backlog unless user again reconfigured the dispatch-rate based on current publish-rate. Reconfiguring dispatch-rate based on publish-rate requires human interaction and monitoring. Therefore, we need a mechanism to configure dispatch rate relat [...]
    
    ### Modification
    `set-dispatch-rate` cli have a flag `--relative-to-publish-rate` to enable relative dispatch throttling.
    
    ```
    pulsar-admin namespaces <property/cluster/namespace> set-dispatch-rate --msg-dispatch-rate 1000 --relative-to-publish-rate
    ```
    
    ### Note:
    I will add broker-level configuration and documentation into separate PR.
---
 .../service/persistent/DispatchRateLimiter.java    | 33 ++++++++--
 .../broker/service/persistent/PersistentTopic.java | 19 +++++-
 .../service/persistent/SubscribeRateLimiter.java   | 12 +---
 .../client/api/MessageDispatchThrottlingTest.java  | 74 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 12 +++-
 .../pulsar/common/policies/data/DispatchRate.java  |  7 ++
 .../org/apache/pulsar/common/util/RateLimiter.java | 18 +++++-
 .../apache/pulsar/common/util/RateLimiterTest.java | 14 +++-
 .../instance/stats/FunctionStatsManager.java       |  4 +-
 .../functions/instance/stats/SinkStatsManager.java |  4 +-
 .../instance/stats/SourceStatsManager.java         |  4 +-
 11 files changed, 175 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index d7d0ade..550ad38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
@@ -44,17 +45,23 @@ public class DispatchRateLimiter {
         REPLICATOR
     }
 
+    private final PersistentTopic topic;
     private final String topicName;
     private final Type type;
 
     private final BrokerService brokerService;
     private RateLimiter dispatchRateLimiterOnMessage;
     private RateLimiter dispatchRateLimiterOnByte;
+    private long subscriptionRelativeRatelimiterOnMessage;
+    private long subscriptionRelativeRatelimiterOnByte;
 
     public DispatchRateLimiter(PersistentTopic topic, Type type) {
+        this.topic = topic;
         this.topicName = topic.getName();
         this.brokerService = topic.getBrokerService();
         this.type = type;
+        this.subscriptionRelativeRatelimiterOnMessage = -1;
+        this.subscriptionRelativeRatelimiterOnByte = -1;
         updateDispatchRate();
     }
 
@@ -272,14 +279,17 @@ public class DispatchRateLimiter {
         long byteRate = dispatchRate.dispatchThrottlingRateInByte;
         long ratePeriod = dispatchRate.ratePeriodInSecond;
 
+        Supplier<Long> permitUpdaterMsg = dispatchRate.relativeToPublishRate
+                ? () -> getRelativeDispatchRateInMsg(dispatchRate)
+                : null;
         // update msg-rateLimiter
         if (msgRate > 0) {
             if (this.dispatchRateLimiterOnMessage == null) {
                 this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
-                        ratePeriod, TimeUnit.SECONDS);
+                        ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
             } else {
                 this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
-                        TimeUnit.SECONDS);
+                        TimeUnit.SECONDS, permitUpdaterMsg);
             }
         } else {
             // message-rate should be disable and close
@@ -289,14 +299,17 @@ public class DispatchRateLimiter {
             }
         }
 
+        Supplier<Long> permitUpdaterByte = dispatchRate.relativeToPublishRate
+                ? () -> getRelativeDispatchRateInByte(dispatchRate)
+                : null;
         // update byte-rateLimiter
         if (byteRate > 0) {
             if (this.dispatchRateLimiterOnByte == null) {
                 this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
-                        ratePeriod, TimeUnit.SECONDS);
+                        ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
             } else {
                 this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
-                        TimeUnit.SECONDS);
+                        TimeUnit.SECONDS, permitUpdaterByte);
             }
         } else {
             // message-rate should be disable and close
@@ -307,6 +320,18 @@ public class DispatchRateLimiter {
         }
     }
 
+    private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
+        return (topic != null && dispatchRate != null)
+                ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.dispatchThrottlingRateInMsg
+                : 0;
+    }
+
+    private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
+        return (topic != null && dispatchRate != null)
+                ? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.dispatchThrottlingRateInByte
+                : 0;
+    }
+
     /**
      * Get configured msg dispatch-throttling rate. Returns -1 if not configured
      *
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f45bb96..efbc4c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -174,6 +174,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     };
 
     private final AtomicLong pendingWriteOps = new AtomicLong(0);
+    private volatile double lastUpdatedAvgPublishRateInMsg = 0;
+    private volatile double lastUpdatedAvgPublishRateInByte = 0;
 
     private static class TopicStatsHelper {
         public double averageMsgSize;
@@ -1283,7 +1285,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
         });
         topicStatsStream.endList();
-
+        // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
+        // average rate.
+        lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
+                ? topicStatsHelper.aggMsgRateIn
+                : (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
+        lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
+                ? topicStatsHelper.aggMsgThroughputIn
+                : (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
         // Start replicator stats
         topicStatsStream.startObject("replication");
         nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
@@ -1447,6 +1456,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         this.addEntryLatencyStatsUsec.reset();
     }
 
+    public double getLastUpdatedAvgPublishRateInMsg() {
+        return lastUpdatedAvgPublishRateInMsg;
+    }
+
+    public double getLastUpdatedAvgPublishRateInByte() {
+        return lastUpdatedAvgPublishRateInByte;
+    }
+
     public TopicStats getStats() {
 
         TopicStats stats = new TopicStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 3a05eac..ee07250 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -23,8 +23,6 @@ import com.google.common.base.MoreObjects;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.util.RateLimiter;
@@ -38,10 +36,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-
 public class SubscribeRateLimiter {
 
     private final String topicName;
@@ -121,7 +115,6 @@ public class SubscribeRateLimiter {
      * @param subscribeRate
      */
     private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) {
-
         long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer;
         long ratePeriod = subscribeRate.ratePeriodInSecond;
 
@@ -129,9 +122,10 @@ public class SubscribeRateLimiter {
         if (ratePerConsumer > 0) {
             if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
                 this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
-                        ratePeriod, TimeUnit.SECONDS));
+                        ratePeriod, TimeUnit.SECONDS, null));
             } else {
-                this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS);
+                this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
+                        null);
             }
         } else {
             // subscribe-rate should be disable and close
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 6d139f3..b85e088 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.client.api;
 
 import com.google.common.collect.Sets;
 
+import static org.testng.Assert.assertNotNull;
+
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -904,4 +907,75 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         });
     }
 
+    /**
+     * It verifies that relative throttling at least dispatch messages as publish-rate.
+     * 
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions")
+    public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace = "my-property/relative_throttling_ns";
+        final String topicName = "persistent://" + namespace + "/relative-throttle";
+
+        final int messageRate = 1;
+        DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1, true);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setDispatchRate(namespace, dispatchRate);
+        // create producer and topic
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        boolean isMessageRateUpdate = false;
+        int retry = 10;
+        for (int i = 0; i < retry; i++) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
+                isMessageRateUpdate = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(isMessageRateUpdate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
+        Thread.sleep(2000);
+
+        final int numProducedMessages = 1000;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscriptionType(subscription).subscribe();
+        // deactive cursors
+        deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
+
+        // send a message, which will make dispatcher-ratelimiter initialize and schedule renew task
+        producer.send("test".getBytes());
+        assertNotNull(consumer.receive(100, TimeUnit.MILLISECONDS));
+
+        Field lastUpdatedMsgRateIn = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg");
+        lastUpdatedMsgRateIn.setAccessible(true);
+        lastUpdatedMsgRateIn.set(topic, numProducedMessages);
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        int totalReceived = 0;
+        // Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate +
+        // dispatch-rate)
+        for (int i = 0; i < numProducedMessages; i++) {
+            Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+            totalReceived++;
+            assertNotNull(msg);
+        }
+
+        Assert.assertEquals(totalReceived, numProducedMessages);
+
+        consumer.close();
+        producer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 9dc8699..237468f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -536,11 +536,15 @@ public class CmdNamespaces extends CmdBase {
                 "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
         private int dispatchRatePeriodSec = 1;
 
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+        private boolean relativeToPublishRate = false;
+        
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             admin.namespaces().setDispatchRate(namespace,
-                    new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
+                    new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
         }
     }
 
@@ -608,11 +612,15 @@ public class CmdNamespaces extends CmdBase {
             "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
         private int dispatchRatePeriodSec = 1;
 
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+        private boolean relativeToPublishRate = false;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             admin.namespaces().setSubscriptionDispatchRate(namespace,
-                new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
+                    new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
         }
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
index c93a713..002cdac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
@@ -28,6 +28,7 @@ public class DispatchRate {
 
     public int dispatchThrottlingRateInMsg = -1;
     public long dispatchThrottlingRateInByte = -1;
+    public boolean relativeToPublishRate = false; /* throttles dispatch relatively publish-rate */
     public int ratePeriodInSecond = 1; /* by default dispatch-rate will be calculate per 1 second */
 
     public DispatchRate() {
@@ -45,6 +46,12 @@ public class DispatchRate {
         this.ratePeriodInSecond = ratePeriodInSecond;
     }
 
+    public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateInByte,
+            int ratePeriodInSecond, boolean relativeToPublishRate) {
+        this(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, ratePeriodInSecond);
+        this.relativeToPublishRate = relativeToPublishRate;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 006cddd..45ae8e2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /**
  * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
@@ -57,19 +58,22 @@ public class RateLimiter implements AutoCloseable{
     private long permits;
     private long acquiredPermits;
     private boolean isClosed;
+    // permitUpdate helps to update permit-rate at runtime
+    private Supplier<Long> permitUpdater;
 
     public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
-        this(null, permits, rateTime, timeUnit);
+        this(null, permits, rateTime, timeUnit, null);
     }
 
     public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
-            final TimeUnit timeUnit) {
+            final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
         checkArgument(permits > 0, "rate must be > 0");
         checkArgument(rateTime > 0, "Renew permit time must be > 0");
 
         this.rateTime = rateTime;
         this.timeUnit = timeUnit;
         this.permits = permits;
+        this.permitUpdater = permitUpdater;
 
         if (service != null) {
             this.executorService = service;
@@ -198,14 +202,16 @@ public class RateLimiter implements AutoCloseable{
      * @param permits
      * @param rateTime
      * @param timeUnit
+     * @param permitUpdaterByte
      */
-    public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) {
+    public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier<Long> permitUpdaterByte) {
         if (renewTask != null) {
             renewTask.cancel(false);
         }
         this.permits = permits;
         this.rateTime = rateTime;
         this.timeUnit = timeUnit;
+        this.permitUpdater = permitUpdaterByte;
         this.renewTask = createTask();
     }
 
@@ -232,6 +238,12 @@ public class RateLimiter implements AutoCloseable{
 
     synchronized void renew() {
         acquiredPermits = 0;
+        if (permitUpdater != null) {
+            long newPermitRate = permitUpdater.get();
+            if (newPermitRate > 0) {
+                setRate(newPermitRate);
+            }
+        }
         notifyAll();
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 71f7fa3..c20f5d1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.testng.annotations.Test;
 
@@ -152,7 +153,7 @@ public class RateLimiterTest {
         assertEquals(rate.getAvailablePermits(), permits);
 
         // change rate-time from 1sec to 5sec
-        rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS);
+        rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS, null);
         assertEquals(rate.getAvailablePermits(), 100);
         assertEquals(rate.tryAcquire(permits), true);
         assertEquals(rate.getAvailablePermits(), 0);
@@ -163,4 +164,15 @@ public class RateLimiterTest {
         rate.close();
     }
 
+    @Test
+    public void testRateLimiterWithPermitUpdater() throws Exception{
+        long permits = 10;
+        long rateTime = 1;
+        long newUpdatedRateLimit = 100L;
+        Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
+        RateLimiter limiter = new RateLimiter(null, permits , 1, TimeUnit.SECONDS, permitUpdater);
+        limiter.acquire();
+        Thread.sleep(rateTime*3*1000);
+        assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit);
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 7943d5a..fdedb74 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -232,8 +232,8 @@ public class FunctionStatsManager extends ComponentStatsManager{
                 .help("Exception from sink.")
                 .register(collectorRegistry);
 
-        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
     }
 
     public void addUserException(Throwable ex) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 3ed2aa1..c913225 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -174,8 +174,8 @@ public class SinkStatsManager extends ComponentStatsManager {
                 .help("Exception from sink.")
                 .register(collectorRegistry);
 
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
-        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index f496709..0ec7352 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -173,8 +173,8 @@ public class SourceStatsManager extends ComponentStatsManager {
                 .help("Exception from source.")
                 .register(collectorRegistry);
 
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
-        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
     }
 
     @Override