You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:33 UTC

[pulsar] 03/17: Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5034d6ccd7ed69230953386f2eced134f1a0f9e
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 16 10:07:47 2022 +0800

    Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
    
    https://github.com/apache/pulsar/blob/58c82a71beb7506e422def391af532945be2b7a7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L377-L399
    
    The object lock may change when execute at line-382, and cause the lock to become useless.
    
    So use `dispatchRateLimiterLock` to synchronize.
    
    (cherry picked from commit ff4e6000f2d58eff930178ae0c02ef9c5fffb47c)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 568ac5ae063..b1ccf0b23ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public boolean msgChunkPublished;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+    private final Object dispatchRateLimiterLock = new Object();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     private final long backloggedCursorThresholdEntries;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -371,7 +372,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
-        synchronized (dispatchRateLimiter) {
+        synchronized (dispatchRateLimiterLock) {
             // dispatch rate limiter for topic
             if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
                     .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
@@ -3096,7 +3097,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
-        synchronized (dispatchRateLimiter) {
+        synchronized (dispatchRateLimiterLock) {
             if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }