You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/12/08 12:12:10 UTC
[pulsar] branch master updated: PIP-52: [pulsar-sever] Add support
of dispatch throttling relative to publish-rate (#5797)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02bf9a0 PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate (#5797)
02bf9a0 is described below
commit 02bf9a0b770e53f5a6f3810e9602ccc9a4c05050
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Dec 8 04:12:04 2019 -0800
PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate (#5797)
### Motivation
With [PIP-3](https://github.com/apache/pulsar/wiki/PIP-3:-Message-dispatch-throttling) , Pulsar broker already supports to configure dispatch rate-limiting for a given topic. Dispatch-throttling feature allows user to configure absolute dispatch rate based on current publish-rate for a given topic or subscriber, and broker will make sure to dispatch only configured number of messages to the consumers regardless current publish-rate or backlog on that topic.
Current dispatch-rate limiting doesn't consider change in publish-rate so, increasing publish-rate on the topic might be larger than configured dispatch-rate which will cause backlog on the topic and consumers will never be able to catch up the backlog unless user again reconfigured the dispatch-rate based on current publish-rate. Reconfiguring dispatch-rate based on publish-rate requires human interaction and monitoring. Therefore, we need a mechanism to configure dispatch rate relat [...]
### Modification
`set-dispatch-rate` cli have a flag `--relative-to-publish-rate` to enable relative dispatch throttling.
```
pulsar-admin namespaces <property/cluster/namespace> set-dispatch-rate --msg-dispatch-rate 1000 --relative-to-publish-rate
```
### Note:
I will add broker-level configuration and documentation into separate PR.
---
.../service/persistent/DispatchRateLimiter.java | 33 ++++++++--
.../broker/service/persistent/PersistentTopic.java | 19 +++++-
.../service/persistent/SubscribeRateLimiter.java | 12 +---
.../client/api/MessageDispatchThrottlingTest.java | 74 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 12 +++-
.../pulsar/common/policies/data/DispatchRate.java | 7 ++
.../org/apache/pulsar/common/util/RateLimiter.java | 18 +++++-
.../apache/pulsar/common/util/RateLimiterTest.java | 14 +++-
.../instance/stats/FunctionStatsManager.java | 4 +-
.../functions/instance/stats/SinkStatsManager.java | 4 +-
.../instance/stats/SourceStatsManager.java | 4 +-
11 files changed, 175 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index d7d0ade..550ad38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -44,17 +45,23 @@ public class DispatchRateLimiter {
REPLICATOR
}
+ private final PersistentTopic topic;
private final String topicName;
private final Type type;
private final BrokerService brokerService;
private RateLimiter dispatchRateLimiterOnMessage;
private RateLimiter dispatchRateLimiterOnByte;
+ private long subscriptionRelativeRatelimiterOnMessage;
+ private long subscriptionRelativeRatelimiterOnByte;
public DispatchRateLimiter(PersistentTopic topic, Type type) {
+ this.topic = topic;
this.topicName = topic.getName();
this.brokerService = topic.getBrokerService();
this.type = type;
+ this.subscriptionRelativeRatelimiterOnMessage = -1;
+ this.subscriptionRelativeRatelimiterOnByte = -1;
updateDispatchRate();
}
@@ -272,14 +279,17 @@ public class DispatchRateLimiter {
long byteRate = dispatchRate.dispatchThrottlingRateInByte;
long ratePeriod = dispatchRate.ratePeriodInSecond;
+ Supplier<Long> permitUpdaterMsg = dispatchRate.relativeToPublishRate
+ ? () -> getRelativeDispatchRateInMsg(dispatchRate)
+ : null;
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
- ratePeriod, TimeUnit.SECONDS);
+ ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
- TimeUnit.SECONDS);
+ TimeUnit.SECONDS, permitUpdaterMsg);
}
} else {
// message-rate should be disable and close
@@ -289,14 +299,17 @@ public class DispatchRateLimiter {
}
}
+ Supplier<Long> permitUpdaterByte = dispatchRate.relativeToPublishRate
+ ? () -> getRelativeDispatchRateInByte(dispatchRate)
+ : null;
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
- ratePeriod, TimeUnit.SECONDS);
+ ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
- TimeUnit.SECONDS);
+ TimeUnit.SECONDS, permitUpdaterByte);
}
} else {
// message-rate should be disable and close
@@ -307,6 +320,18 @@ public class DispatchRateLimiter {
}
}
+ private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
+ return (topic != null && dispatchRate != null)
+ ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.dispatchThrottlingRateInMsg
+ : 0;
+ }
+
+ private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
+ return (topic != null && dispatchRate != null)
+ ? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.dispatchThrottlingRateInByte
+ : 0;
+ }
+
/**
* Get configured msg dispatch-throttling rate. Returns -1 if not configured
*
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f45bb96..efbc4c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -174,6 +174,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
};
private final AtomicLong pendingWriteOps = new AtomicLong(0);
+ private volatile double lastUpdatedAvgPublishRateInMsg = 0;
+ private volatile double lastUpdatedAvgPublishRateInByte = 0;
private static class TopicStatsHelper {
public double averageMsgSize;
@@ -1283,7 +1285,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
});
topicStatsStream.endList();
-
+ // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
+ // average rate.
+ lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
+ ? topicStatsHelper.aggMsgRateIn
+ : (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
+ lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
+ ? topicStatsHelper.aggMsgThroughputIn
+ : (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
@@ -1447,6 +1456,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.addEntryLatencyStatsUsec.reset();
}
+ public double getLastUpdatedAvgPublishRateInMsg() {
+ return lastUpdatedAvgPublishRateInMsg;
+ }
+
+ public double getLastUpdatedAvgPublishRateInByte() {
+ return lastUpdatedAvgPublishRateInByte;
+ }
+
public TopicStats getStats() {
TopicStats stats = new TopicStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 3a05eac..ee07250 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -23,8 +23,6 @@ import com.google.common.base.MoreObjects;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RateLimiter;
@@ -38,10 +36,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-
public class SubscribeRateLimiter {
private final String topicName;
@@ -121,7 +115,6 @@ public class SubscribeRateLimiter {
* @param subscribeRate
*/
private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) {
-
long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer;
long ratePeriod = subscribeRate.ratePeriodInSecond;
@@ -129,9 +122,10 @@ public class SubscribeRateLimiter {
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
- ratePeriod, TimeUnit.SECONDS));
+ ratePeriod, TimeUnit.SECONDS, null));
} else {
- this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS);
+ this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
+ null);
}
} else {
// subscribe-rate should be disable and close
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 6d139f3..b85e088 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.client.api;
import com.google.common.collect.Sets;
+import static org.testng.Assert.assertNotNull;
+
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -904,4 +907,75 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
});
}
+ /**
+ * It verifies that relative throttling at least dispatch messages as publish-rate.
+ *
+ * @param subscription
+ * @throws Exception
+ */
+ @Test(dataProvider = "subscriptions")
+ public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String namespace = "my-property/relative_throttling_ns";
+ final String topicName = "persistent://" + namespace + "/relative-throttle";
+
+ final int messageRate = 1;
+ DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1, true);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setDispatchRate(namespace, dispatchRate);
+ // create producer and topic
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ boolean isMessageRateUpdate = false;
+ int retry = 10;
+ for (int i = 0; i < retry; i++) {
+ if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
+ isMessageRateUpdate = true;
+ break;
+ } else {
+ if (i != retry - 1) {
+ Thread.sleep(100);
+ }
+ }
+ }
+ Assert.assertTrue(isMessageRateUpdate);
+ Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
+ Thread.sleep(2000);
+
+ final int numProducedMessages = 1000;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .subscriptionType(subscription).subscribe();
+ // deactive cursors
+ deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
+
+ // send a message, which will make dispatcher-ratelimiter initialize and schedule renew task
+ producer.send("test".getBytes());
+ assertNotNull(consumer.receive(100, TimeUnit.MILLISECONDS));
+
+ Field lastUpdatedMsgRateIn = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg");
+ lastUpdatedMsgRateIn.setAccessible(true);
+ lastUpdatedMsgRateIn.set(topic, numProducedMessages);
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ int totalReceived = 0;
+ // Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate +
+ // dispatch-rate)
+ for (int i = 0; i < numProducedMessages; i++) {
+ Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+ totalReceived++;
+ assertNotNull(msg);
+ }
+
+ Assert.assertEquals(totalReceived, numProducedMessages);
+
+ consumer.close();
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 9dc8699..237468f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -536,11 +536,15 @@ public class CmdNamespaces extends CmdBase {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setDispatchRate(namespace,
- new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
+ new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}
@@ -608,11 +612,15 @@ public class CmdNamespaces extends CmdBase {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setSubscriptionDispatchRate(namespace,
- new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
+ new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
index c93a713..002cdac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
@@ -28,6 +28,7 @@ public class DispatchRate {
public int dispatchThrottlingRateInMsg = -1;
public long dispatchThrottlingRateInByte = -1;
+ public boolean relativeToPublishRate = false; /* throttles dispatch relatively publish-rate */
public int ratePeriodInSecond = 1; /* by default dispatch-rate will be calculate per 1 second */
public DispatchRate() {
@@ -45,6 +46,12 @@ public class DispatchRate {
this.ratePeriodInSecond = ratePeriodInSecond;
}
+ public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateInByte,
+ int ratePeriodInSecond, boolean relativeToPublishRate) {
+ this(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, ratePeriodInSecond);
+ this.relativeToPublishRate = relativeToPublishRate;
+ }
+
@Override
public int hashCode() {
return Objects.hash(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 006cddd..45ae8e2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
@@ -57,19 +58,22 @@ public class RateLimiter implements AutoCloseable{
private long permits;
private long acquiredPermits;
private boolean isClosed;
+ // permitUpdate helps to update permit-rate at runtime
+ private Supplier<Long> permitUpdater;
public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
- this(null, permits, rateTime, timeUnit);
+ this(null, permits, rateTime, timeUnit, null);
}
public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
- final TimeUnit timeUnit) {
+ final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");
this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
+ this.permitUpdater = permitUpdater;
if (service != null) {
this.executorService = service;
@@ -198,14 +202,16 @@ public class RateLimiter implements AutoCloseable{
* @param permits
* @param rateTime
* @param timeUnit
+ * @param permitUpdaterByte
*/
- public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) {
+ public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier<Long> permitUpdaterByte) {
if (renewTask != null) {
renewTask.cancel(false);
}
this.permits = permits;
this.rateTime = rateTime;
this.timeUnit = timeUnit;
+ this.permitUpdater = permitUpdaterByte;
this.renewTask = createTask();
}
@@ -232,6 +238,12 @@ public class RateLimiter implements AutoCloseable{
synchronized void renew() {
acquiredPermits = 0;
+ if (permitUpdater != null) {
+ long newPermitRate = permitUpdater.get();
+ if (newPermitRate > 0) {
+ setRate(newPermitRate);
+ }
+ }
notifyAll();
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 71f7fa3..c20f5d1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.testng.annotations.Test;
@@ -152,7 +153,7 @@ public class RateLimiterTest {
assertEquals(rate.getAvailablePermits(), permits);
// change rate-time from 1sec to 5sec
- rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS);
+ rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS, null);
assertEquals(rate.getAvailablePermits(), 100);
assertEquals(rate.tryAcquire(permits), true);
assertEquals(rate.getAvailablePermits(), 0);
@@ -163,4 +164,15 @@ public class RateLimiterTest {
rate.close();
}
+ @Test
+ public void testRateLimiterWithPermitUpdater() throws Exception{
+ long permits = 10;
+ long rateTime = 1;
+ long newUpdatedRateLimit = 100L;
+ Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
+ RateLimiter limiter = new RateLimiter(null, permits , 1, TimeUnit.SECONDS, permitUpdater);
+ limiter.acquire();
+ Thread.sleep(rateTime*3*1000);
+ assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit);
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 7943d5a..fdedb74 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -232,8 +232,8 @@ public class FunctionStatsManager extends ComponentStatsManager{
.help("Exception from sink.")
.register(collectorRegistry);
- userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
}
public void addUserException(Throwable ex) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 3ed2aa1..c913225 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -174,8 +174,8 @@ public class SinkStatsManager extends ComponentStatsManager {
.help("Exception from sink.")
.register(collectorRegistry);
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
- sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index f496709..0ec7352 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -173,8 +173,8 @@ public class SourceStatsManager extends ComponentStatsManager {
.help("Exception from source.")
.register(collectorRegistry);
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
- sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
}
@Override