You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:37 UTC
[pulsar] 01/20: [Broker] Fix set-publish-rate when using
preciseTopicPublishRateLimiterEnable=true (#10384)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 41ad6245f67be7830549d9f6e90c364eab0041cd
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Aug 3 19:14:26 2021 +0300
[Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384)
### Motivation
When using `preciseTopicPublishRateLimiterEnable=true` (introduced by #7078) setting for rate limiting, there are various issues:
- updating the limits doesn't set either boundary when changing the limits from a bounded limit to unbounded.
- each topic will create a scheduler thread for each limiter instance
- each topic will never release the scheduler thread when the topic gets unloaded / closed
- updating the limits didn't close the scheduler thread related to the replaced limiter instance
### Modifications
- Fix updating of the limits by cleaning up the previous limiter instances before creating new limiter instances
- Use `brokerService.pulsar().getExecutor()` as the scheduler for the rate limiter instances
- Add resource cleanup hooks for topic closing (unload)
### Open issue
The existing code has a difference in passing the `rateLimitFunction`:
https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java#L80-L86
It's passed to the `topicPublishRateLimiterOnMessage`, but not to `topicPublishRateLimiterOnByte` . It is unclear whether this is intentional.
The `rateLimitFunction` is `() -> this.enableCnxAutoRead()`
https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L913
(This also raises a question whether rate limiting works consistently when multiple topics share the same connection.)
(cherry picked from commit ded806fd52f6e2f182fa02052cbd82c2a6755098)
---
.../pulsar/broker/service/AbstractTopic.java | 2 +-
.../broker/service/PrecisPublishLimiter.java | 113 +++++++++++++++------
.../pulsar/broker/service/PublishRateLimiter.java | 2 +-
.../broker/service/PublishRateLimiterDisable.java | 4 +
.../broker/service/PublishRateLimiterImpl.java | 5 +
.../service/nonpersistent/NonPersistentTopic.java | 7 ++
.../broker/service/persistent/PersistentTopic.java | 7 ++
.../service/persistent/SubscribeRateLimiter.java | 2 +-
.../broker/service/PrecisPublishLimiterTest.java | 57 +++++++++++
.../org/apache/pulsar/common/util/RateLimiter.java | 29 +++++-
.../instance/stats/FunctionStatsManager.java | 14 +--
.../functions/instance/stats/SinkStatsManager.java | 4 +-
.../instance/stats/SourceStatsManager.java | 4 +-
13 files changed, 201 insertions(+), 49 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 54f9e8d..39b0650 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -883,7 +883,7 @@ public abstract class AbstractTopic implements Topic {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
- () -> this.enableCnxAutoRead());
+ () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
index 60fbcf0..e61597e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.service;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
@@ -27,30 +27,37 @@ import org.apache.pulsar.common.util.RateLimiter;
public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
- protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
- private RateLimiter topicPublishRateLimiterOnMessage;
- private RateLimiter topicPublishRateLimiterOnByte;
+ private volatile RateLimiter topicPublishRateLimiterOnMessage;
+ private volatile RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
+ private final ScheduledExecutorService scheduledExecutorService;
public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
+ this.scheduledExecutorService = null;
}
public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
+ this(publishRate, rateLimitFunction, null);
+ }
+
+ public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
+ ScheduledExecutorService scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
+ this.scheduledExecutorService = scheduledExecutorService;
}
@Override
public void checkPublishRate() {
- // No-op
+ // No-op
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
- // No-op
+ // No-op
}
@Override
@@ -62,10 +69,15 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
public boolean isPublishRateExceeded() {
return false;
}
+
// If all rate limiters are not exceeded, re-enable auto read from socket.
private void tryReleaseConnectionThrottle() {
- if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
- || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
+ RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+ RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+ if ((currentTopicPublishRateLimiterOnMessage != null
+ && currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
+ || (currentTopicPublishRateLimiterOnByte != null
+ && currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
return;
}
this.rateLimitFunction.apply();
@@ -78,34 +90,73 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
: null;
this.update(maxPublishRate);
}
+
public void update(PublishRate maxPublishRate) {
- if (maxPublishRate != null
- && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
- this.publishThrottlingEnabled = true;
- this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
- this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
- if (this.publishMaxMessageRate > 0) {
- topicPublishRateLimiterOnMessage =
- new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
- this::tryReleaseConnectionThrottle, true);
+ replaceLimiters(() -> {
+ if (maxPublishRate != null
+ && (maxPublishRate.publishThrottlingRateInMsg > 0
+ || maxPublishRate.publishThrottlingRateInByte > 0)) {
+ this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+ this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+ if (this.publishMaxMessageRate > 0) {
+ topicPublishRateLimiterOnMessage =
+ RateLimiter.builder()
+ .scheduledExecutorService(scheduledExecutorService)
+ .permits(publishMaxMessageRate)
+ .rateLimitFunction(this::tryReleaseConnectionThrottle)
+ .isDispatchOrPrecisePublishRateLimiter(true)
+ .build();
+ }
+ if (this.publishMaxByteRate > 0) {
+ topicPublishRateLimiterOnByte = RateLimiter.builder()
+ .scheduledExecutorService(scheduledExecutorService)
+ .permits(publishMaxByteRate)
+ .rateLimitFunction(this::tryReleaseConnectionThrottle)
+ .isDispatchOrPrecisePublishRateLimiter(true)
+ .build();
+ }
+ } else {
+ this.publishMaxMessageRate = 0;
+ this.publishMaxByteRate = 0;
}
- if (this.publishMaxByteRate > 0) {
- topicPublishRateLimiterOnByte =
- new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
- this::tryReleaseConnectionThrottle, true);
- }
- } else {
- this.publishMaxMessageRate = 0;
- this.publishMaxByteRate = 0;
- this.publishThrottlingEnabled = false;
- topicPublishRateLimiterOnMessage = null;
- topicPublishRateLimiterOnByte = null;
- }
+ });
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
- return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
- && (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
+ RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+ RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+ return (currentTopicPublishRateLimiterOnMessage == null
+ || currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers))
+ && (currentTopicPublishRateLimiterOnByte == null
+ || currentTopicPublishRateLimiterOnByte.tryAcquire(bytes));
+ }
+
+ @Override
+ public void close() throws Exception {
+ rateLimitFunction.apply();
+ replaceLimiters(null);
+ }
+
+ private void replaceLimiters(Runnable updater) {
+ RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
+ topicPublishRateLimiterOnMessage = null;
+ RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
+ topicPublishRateLimiterOnByte = null;
+ try {
+ if (updater != null) {
+ updater.run();
+ }
+ } finally {
+ // Close previous limiters to prevent resource leakages.
+ // Delay closing of previous limiters after new ones are in place so that updating the limiter
+ // doesn't cause unavailability.
+ if (previousTopicPublishRateLimiterOnMessage != null) {
+ previousTopicPublishRateLimiterOnMessage.close();
+ }
+ if (previousTopicPublishRateLimiterOnByte != null) {
+ previousTopicPublishRateLimiterOnByte.close();
+ }
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
index ec23b26..3978879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
-public interface PublishRateLimiter {
+public interface PublishRateLimiter extends AutoCloseable {
PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
index c72f6ba..cf18192 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
@@ -62,4 +62,8 @@ public class PublishRateLimiterDisable implements PublishRateLimiter {
return true;
}
+ @Override
+ public void close() throws Exception {
+ // No-op
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index c1458cf..0e1200e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -108,4 +108,9 @@ public class PublishRateLimiterImpl implements PublishRateLimiter {
public boolean tryAcquire(int numbers, long bytes) {
return false;
}
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7abe710..5224eaa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -445,6 +445,13 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
+ if (topicPublishRateLimiter != null) {
+ try {
+ topicPublishRateLimiter.close();
+ } catch (Exception e) {
+ log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
+ }
+ }
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
CompletableFuture<Void> clientCloseFuture =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 40ed2a1..ca22511 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1157,6 +1157,13 @@ public class PersistentTopic extends AbstractTopic
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
+ if (topicPublishRateLimiter != null) {
+ try {
+ topicPublishRateLimiter.close();
+ } catch (Exception e) {
+ log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
+ }
+ }
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index f014717..a13328c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -140,7 +140,7 @@ public class SubscribeRateLimiter {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
- ratePeriod, TimeUnit.SECONDS, null));
+ ratePeriod, TimeUnit.SECONDS));
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
new file mode 100644
index 0000000..61804e7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.testng.annotations.Test;
+
+public class PrecisPublishLimiterTest {
+
+ @Test
+ void shouldResetMsgLimitAfterUpdate() {
+ PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
+ });
+ precisPublishLimiter.update(new PublishRate(1, 1));
+ assertFalse(precisPublishLimiter.tryAcquire(99, 99));
+ precisPublishLimiter.update(new PublishRate(-1, 100));
+ assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+ }
+
+ @Test
+ void shouldResetBytesLimitAfterUpdate() {
+ PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
+ });
+ precisPublishLimiter.update(new PublishRate(1, 1));
+ assertFalse(precisPublishLimiter.tryAcquire(99, 99));
+ precisPublishLimiter.update(new PublishRate(100, -1));
+ assertTrue(precisPublishLimiter.tryAcquire(99, 99));
+ }
+
+ @Test
+ void shouldCloseResources() throws Exception {
+ for (int i = 0; i < 20000; i++) {
+ PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
+ });
+ precisPublishLimiter.tryAcquire(99, 99);
+ precisPublishLimiter.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index cb88a95..1bb2fcd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import lombok.Builder;
/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
@@ -48,7 +49,6 @@ import java.util.function.Supplier;
* </ul>
*/
public class RateLimiter implements AutoCloseable{
-
private final ScheduledExecutorService executorService;
private long rateTime;
private TimeUnit timeUnit;
@@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{
private boolean isDispatchOrPrecisePublishRateLimiter;
public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
- this(null, permits, rateTime, timeUnit, null);
+ this(null, permits, rateTime, timeUnit);
}
public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
@@ -83,12 +83,25 @@ public class RateLimiter implements AutoCloseable{
}
public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
+ final TimeUnit timeUnit) {
+ this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
+ }
+
+ public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}
public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
+ this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
+ null);
+ }
+
+ @Builder
+ RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
+ final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
+ RateLimitFunction rateLimitFunction) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");
@@ -98,8 +111,8 @@ public class RateLimiter implements AutoCloseable{
this.permitUpdater = permitUpdater;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;
- if (service != null) {
- this.executorService = service;
+ if (scheduledExecutorService != null) {
+ this.executorService = scheduledExecutorService;
this.externalExecutor = true;
} else {
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
@@ -109,6 +122,14 @@ public class RateLimiter implements AutoCloseable{
this.externalExecutor = false;
}
+ this.rateLimitFunction = rateLimitFunction;
+
+ }
+
+ // default values for Lombok generated builder class
+ public static class RateLimiterBuilder {
+ private long rateTime = 1;
+ private TimeUnit timeUnit = TimeUnit.SECONDS;
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index 08ea9ea..b008371 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{
final Counter statTotalSysExceptions;
final Counter statTotalUserExceptions;
-
+
final Summary statProcessLatency;
final Gauge statlastInvocation;
final Counter statTotalRecordsReceived;
-
+
// windowed metrics
final Counter statTotalProcessedSuccessfully1min;
@@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
final Counter statTotalSysExceptions1min;
final Counter statTotalUserExceptions1min;
-
+
final Summary statProcessLatency1min;
final Counter statTotalRecordsReceived1min;
@@ -262,8 +262,8 @@ public class FunctionStatsManager extends ComponentStatsManager{
.help("Exception from sink.")
.create());
- userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}
public void addUserException(Throwable ex) {
@@ -371,7 +371,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
public double getTotalUserExceptions() {
return _statTotalUserExceptions.get();
}
-
+
@Override
public double getLastInvocation() {
return _statlastInvocation.get();
@@ -417,7 +417,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
public double getTotalUserExceptions1min() {
return _statTotalUserExceptions1min.get();
}
-
+
@Override
public double getAvgProcessLatency1min() {
return _statProcessLatency1min.get().count <= 0.0
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 255ad62..536f55a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -196,8 +196,8 @@ public class SinkStatsManager extends ComponentStatsManager {
.help("Exception from sink.")
.create());
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
- sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index f4e1da0..451a8ad 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -195,8 +195,8 @@ public class SourceStatsManager extends ComponentStatsManager {
.help("Exception from source.")
.create());
- sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
- sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
+ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}
@Override