You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/16 02:07:54 UTC
[pulsar] branch master updated: Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ff4e6000f2d Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
ff4e6000f2d is described below
commit ff4e6000f2d58eff930178ae0c02ef9c5fffb47c
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 16 10:07:47 2022 +0800
Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
### Motivation
https://github.com/apache/pulsar/blob/58c82a71beb7506e422def391af532945be2b7a7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L377-L399
The object lock may change when execute at line-382, and cause the lock to become useless.
So use `dispatchRateLimiterLock` to synchronize.
---
.../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cd75b745bba..9847f07950a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -187,6 +187,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public boolean msgChunkPublished;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+ private final Object dispatchRateLimiterLock = new Object();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
private final long backloggedCursorThresholdEntries;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -375,7 +376,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
private void initializeRateLimiterIfNeeded() {
- synchronized (dispatchRateLimiter) {
+ synchronized (dispatchRateLimiterLock) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent()
&& DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
@@ -3066,7 +3067,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
- synchronized (dispatchRateLimiter) {
+ synchronized (dispatchRateLimiterLock) {
if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}