You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:37 UTC

[pulsar] 01/20: [Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 41ad6245f67be7830549d9f6e90c364eab0041cd
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Aug 3 19:14:26 2021 +0300

    [Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384)
    
    ### Motivation
    
    When using `preciseTopicPublishRateLimiterEnable=true` (introduced by #7078) setting for rate limiting, there are various issues:
    - updating the limits doesn't set either boundary when changing the limits from a bounded limit to unbounded.
    - each topic will create a scheduler thread for each limiter instance
    - each topic will never release the scheduler thread when the topic gets unloaded / closed
    - updating the limits didn't close the scheduler thread related to the replaced limiter instance
    
    ### Modifications
    
    - Fix updating of the limits by cleaning up the previous limiter instances before creating new limiter instances
    - Use `brokerService.pulsar().getExecutor()` as the scheduler for the rate limiter instances
    - Add resource cleanup hooks for topic closing (unload)
    
    ### Open issue
    
    The existing code has a difference in passing the `rateLimitFunction`:
    https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java#L80-L86
    It's passed to the `topicPublishRateLimiterOnMessage`, but not to `topicPublishRateLimiterOnByte` . It is unclear whether this is intentional.
    The `rateLimitFunction` is `() -> this.enableCnxAutoRead()`
    https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L913
    (This also raises a question whether rate limiting works consistently when multiple topics share the same connection.)
    
    (cherry picked from commit ded806fd52f6e2f182fa02052cbd82c2a6755098)
---
 .../pulsar/broker/service/AbstractTopic.java       |   2 +-
 .../broker/service/PrecisPublishLimiter.java       | 113 +++++++++++++++------
 .../pulsar/broker/service/PublishRateLimiter.java  |   2 +-
 .../broker/service/PublishRateLimiterDisable.java  |   4 +
 .../broker/service/PublishRateLimiterImpl.java     |   5 +
 .../service/nonpersistent/NonPersistentTopic.java  |   7 ++
 .../broker/service/persistent/PersistentTopic.java |   7 ++
 .../service/persistent/SubscribeRateLimiter.java   |   2 +-
 .../broker/service/PrecisPublishLimiterTest.java   |  57 +++++++++++
 .../org/apache/pulsar/common/util/RateLimiter.java |  29 +++++-
 .../instance/stats/FunctionStatsManager.java       |  14 +--
 .../functions/instance/stats/SinkStatsManager.java |   4 +-
 .../instance/stats/SourceStatsManager.java         |   4 +-
 13 files changed, 201 insertions(+), 49 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 54f9e8d..39b0650 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -883,7 +883,7 @@ public abstract class AbstractTopic implements Topic {
                 // create new rateLimiter if rate-limiter is disabled
                 if (preciseTopicPublishRateLimitingEnable) {
                     this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
-                            () -> this.enableCnxAutoRead());
+                            () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
                 } else {
                     this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
index 60fbcf0..e61597e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.util.RateLimitFunction;
@@ -27,30 +27,37 @@ import org.apache.pulsar.common.util.RateLimiter;
 public class PrecisPublishLimiter implements PublishRateLimiter {
     protected volatile int publishMaxMessageRate = 0;
     protected volatile long publishMaxByteRate = 0;
-    protected volatile boolean publishThrottlingEnabled = false;
     // precise mode for publish rate limiter
-    private RateLimiter topicPublishRateLimiterOnMessage;
-    private RateLimiter topicPublishRateLimiterOnByte;
+    private volatile RateLimiter topicPublishRateLimiterOnMessage;
+    private volatile RateLimiter topicPublishRateLimiterOnByte;
     private final RateLimitFunction rateLimitFunction;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
         this.rateLimitFunction = rateLimitFunction;
         update(policies, clusterName);
+        this.scheduledExecutorService = null;
     }
 
     public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
+        this(publishRate, rateLimitFunction, null);
+    }
+
+    public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
+                                ScheduledExecutorService scheduledExecutorService) {
         this.rateLimitFunction = rateLimitFunction;
         update(publishRate);
+        this.scheduledExecutorService = scheduledExecutorService;
     }
 
     @Override
     public void checkPublishRate() {
-       // No-op
+        // No-op
     }
 
     @Override
     public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
-       // No-op
+        // No-op
     }
 
     @Override
@@ -62,10 +69,15 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
     public boolean isPublishRateExceeded() {
         return false;
     }
+
     // If all rate limiters are not exceeded, re-enable auto read from socket.
     private void tryReleaseConnectionThrottle() {
-        if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
-        || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
+        RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+        RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+        if ((currentTopicPublishRateLimiterOnMessage != null
+                && currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
+                || (currentTopicPublishRateLimiterOnByte != null
+                && currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
             return;
         }
         this.rateLimitFunction.apply();
@@ -78,34 +90,73 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
                 : null;
         this.update(maxPublishRate);
     }
+
     public void update(PublishRate maxPublishRate) {
-        if (maxPublishRate != null
-                && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
-            this.publishThrottlingEnabled = true;
-            this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
-            this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
-            if (this.publishMaxMessageRate > 0) {
-                topicPublishRateLimiterOnMessage =
-                        new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
-                                this::tryReleaseConnectionThrottle, true);
+        replaceLimiters(() -> {
+            if (maxPublishRate != null
+                    && (maxPublishRate.publishThrottlingRateInMsg > 0
+                    || maxPublishRate.publishThrottlingRateInByte > 0)) {
+                this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+                this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+                if (this.publishMaxMessageRate > 0) {
+                    topicPublishRateLimiterOnMessage =
+                            RateLimiter.builder()
+                                    .scheduledExecutorService(scheduledExecutorService)
+                                    .permits(publishMaxMessageRate)
+                                    .rateLimitFunction(this::tryReleaseConnectionThrottle)
+                                    .isDispatchOrPrecisePublishRateLimiter(true)
+                                    .build();
+                }
+                if (this.publishMaxByteRate > 0) {
+                    topicPublishRateLimiterOnByte = RateLimiter.builder()
+                            .scheduledExecutorService(scheduledExecutorService)
+                            .permits(publishMaxByteRate)
+                            .rateLimitFunction(this::tryReleaseConnectionThrottle)
+                            .isDispatchOrPrecisePublishRateLimiter(true)
+                            .build();
+                }
+            } else {
+                this.publishMaxMessageRate = 0;
+                this.publishMaxByteRate = 0;
             }
-            if (this.publishMaxByteRate > 0) {
-                topicPublishRateLimiterOnByte =
-                        new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
-                                this::tryReleaseConnectionThrottle, true);
-            }
-        } else {
-            this.publishMaxMessageRate = 0;
-            this.publishMaxByteRate = 0;
-            this.publishThrottlingEnabled = false;
-            topicPublishRateLimiterOnMessage = null;
-            topicPublishRateLimiterOnByte = null;
-        }
+        });
     }
 
     @Override
     public boolean tryAcquire(int numbers, long bytes) {
-        return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
-                && (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
+        RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+        RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+        return (currentTopicPublishRateLimiterOnMessage == null
+                || currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers))
+                && (currentTopicPublishRateLimiterOnByte == null
+                || currentTopicPublishRateLimiterOnByte.tryAcquire(bytes));
+    }
+
+    @Override
+    public void close() throws Exception {
+        rateLimitFunction.apply();
+        replaceLimiters(null);
+    }
+
+    private void replaceLimiters(Runnable updater) {
+        RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+        topicPublishRateLimiterOnMessage = null;
+        RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+        topicPublishRateLimiterOnByte = null;
+        try {
+            if (updater != null) {
+                updater.run();
+            }
+        } finally {
+            // Close previous limiters to prevent resource leakages.
+            // Delay closing of previous limiters after new ones are in place so that updating the limiter
+            // doesn't cause unavailability.
+            if (previousTopicPublishRateLimiterOnMessage != null) {
+                previousTopicPublishRateLimiterOnMessage.close();
+            }
+            if (previousTopicPublishRateLimiterOnByte != null) {
+                previousTopicPublishRateLimiterOnByte.close();
+            }
+        }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
index ec23b26..3978879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 
-public interface PublishRateLimiter {
+public interface PublishRateLimiter extends AutoCloseable {
 
     PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
index c72f6ba..cf18192 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
@@ -62,4 +62,8 @@ public class PublishRateLimiterDisable implements PublishRateLimiter {
         return true;
     }
 
+    @Override
+    public void close() throws Exception {
+        // No-op
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index c1458cf..0e1200e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -108,4 +108,9 @@ public class PublishRateLimiterImpl implements PublishRateLimiter {
     public boolean tryAcquire(int numbers, long bytes) {
         return false;
     }
+
+    @Override
+    public void close() throws Exception {
+        // no-op
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7abe710..5224eaa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -445,6 +445,13 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> futures.add(producer.disconnect()));
+        if (topicPublishRateLimiter != null) {
+            try {
+                topicPublishRateLimiter.close();
+            } catch (Exception e) {
+                log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
+            }
+        }
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
 
         CompletableFuture<Void> clientCloseFuture =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 40ed2a1..ca22511 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1157,6 +1157,13 @@ public class PersistentTopic extends AbstractTopic
         futures.add(transactionBuffer.closeAsync());
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> futures.add(producer.disconnect()));
+        if (topicPublishRateLimiter != null) {
+            try {
+                topicPublishRateLimiter.close();
+            } catch (Exception e) {
+                log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
+            }
+        }
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
 
         CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index f014717..a13328c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -140,7 +140,7 @@ public class SubscribeRateLimiter {
             if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
                 this.subscribeRateLimiter.put(consumerIdentifier,
                         new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
-                                ratePeriod, TimeUnit.SECONDS, null));
+                                ratePeriod, TimeUnit.SECONDS));
             } else {
                 this.subscribeRateLimiter.get(consumerIdentifier)
                         .setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
new file mode 100644
index 0000000..61804e7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.testng.annotations.Test;
+
+public class PrecisPublishLimiterTest {
+
+    @Test
+    void shouldResetMsgLimitAfterUpdate() {
+        PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
+        });
+        precisPublishLimiter.update(new PublishRate(1, 1));
+        assertFalse(precisPublishLimiter.tryAcquire(99, 99));
+        precisPublishLimiter.update(new PublishRate(-1, 100));
+        assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+    }
+
+    @Test
+    void shouldResetBytesLimitAfterUpdate() {
+        PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
+        });
+        precisPublishLimiter.update(new PublishRate(1, 1));
+        assertFalse(precisPublishLimiter.tryAcquire(99, 99));
+        precisPublishLimiter.update(new PublishRate(100, -1));
+        assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+    }
+
+    @Test
+    void shouldCloseResources() throws Exception {
+        for (int i = 0; i < 20000; i++) {
+            PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
+            });
+            precisPublishLimiter.tryAcquire(99, 99);
+            precisPublishLimiter.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index cb88a95..1bb2fcd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import lombok.Builder;
 
 /**
  * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
@@ -48,7 +49,6 @@ import java.util.function.Supplier;
  * </ul>
  */
 public class RateLimiter implements AutoCloseable{
-
     private final ScheduledExecutorService executorService;
     private long rateTime;
     private TimeUnit timeUnit;
@@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{
     private boolean isDispatchOrPrecisePublishRateLimiter;
 
     public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
-        this(null, permits, rateTime, timeUnit, null);
+        this(null, permits, rateTime, timeUnit);
     }
 
     public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
@@ -83,12 +83,25 @@ public class RateLimiter implements AutoCloseable{
     }
 
     public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
+                       final TimeUnit timeUnit) {
+        this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
+    }
+
+    public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
                        final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
         this(service, permits, rateTime, timeUnit, permitUpdater, false);
     }
 
     public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
             final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
+        this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
+                null);
+    }
+
+    @Builder
+    RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
+            final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
+                       RateLimitFunction rateLimitFunction) {
         checkArgument(permits > 0, "rate must be > 0");
         checkArgument(rateTime > 0, "Renew permit time must be > 0");
 
@@ -98,8 +111,8 @@ public class RateLimiter implements AutoCloseable{
         this.permitUpdater = permitUpdater;
         this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;
 
-        if (service != null) {
-            this.executorService = service;
+        if (scheduledExecutorService != null) {
+            this.executorService = scheduledExecutorService;
             this.externalExecutor = true;
         } else {
             final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
@@ -109,6 +122,14 @@ public class RateLimiter implements AutoCloseable{
             this.externalExecutor = false;
         }
 
+        this.rateLimitFunction = rateLimitFunction;
+
+    }
+
+    // default values for Lombok generated builder class
+    public static class RateLimiterBuilder {
+        private long rateTime = 1;
+        private TimeUnit timeUnit = TimeUnit.SECONDS;
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 08ea9ea..b008371 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{
     final Counter statTotalSysExceptions;
 
     final Counter statTotalUserExceptions;
-    
+
     final Summary statProcessLatency;
 
     final Gauge statlastInvocation;
 
     final Counter statTotalRecordsReceived;
-    
+
     // windowed metrics
 
     final Counter statTotalProcessedSuccessfully1min;
@@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
     final Counter statTotalSysExceptions1min;
 
     final Counter statTotalUserExceptions1min;
-    
+
     final Summary statProcessLatency1min;
 
     final Counter statTotalRecordsReceived1min;
@@ -262,8 +262,8 @@ public class FunctionStatsManager extends ComponentStatsManager{
                 .help("Exception from sink.")
                 .create());
 
-        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
     }
 
     public void addUserException(Throwable ex) {
@@ -371,7 +371,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
     public double getTotalUserExceptions() {
         return _statTotalUserExceptions.get();
     }
-    
+
     @Override
     public double getLastInvocation() {
         return _statlastInvocation.get();
@@ -417,7 +417,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
     public double getTotalUserExceptions1min() {
         return _statTotalUserExceptions1min.get();
     }
-    
+
     @Override
     public double getAvgProcessLatency1min() {
         return _statProcessLatency1min.get().count <= 0.0
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 255ad62..536f55a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -196,8 +196,8 @@ public class SinkStatsManager extends ComponentStatsManager {
                 .help("Exception from sink.")
                 .create());
 
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
-        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index f4e1da0..451a8ad 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -195,8 +195,8 @@ public class SourceStatsManager extends ComponentStatsManager {
                 .help("Exception from source.")
                 .create());
 
-        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
-        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+        sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
     }
 
     @Override