You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 22:39:31 UTC

(pulsar) 03/05: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2070a87d556816a831a60c8056fb6cbddfb61c1
Author: Yunze Xu <xy...@163.com>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

    [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)
    
    (cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bf6482bda01..cc7b6841e5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+    private final Object lockForReadOnActiveConsumerTask = new Object();
 
     private final RedeliveryTracker redeliveryTracker;
 
@@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             return;
         }
 
-        readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
-                        serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+        synchronized (lockForReadOnActiveConsumerTask) {
+            if (readOnActiveConsumerTask != null) {
+                return;
             }
-            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+            readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
+                            serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+                }
+                Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
 
-            notifyActiveConsumerChanged(activeConsumer);
-            readMoreEntries(activeConsumer);
-            readOnActiveConsumerTask = null;
-        }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+                notifyActiveConsumerChanged(activeConsumer);
+                readMoreEntries(activeConsumer);
+                readOnActiveConsumerTask = null;
+            }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override