You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 22:39:31 UTC
(pulsar) 03/05: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e2070a87d556816a831a60c8056fb6cbddfb61c1
Author: Yunze Xu <xy...@163.com>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800
[fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)
(cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d)
---
.../PersistentDispatcherSingleActiveConsumer.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bf6482bda01..cc7b6841e5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+ private final Object lockForReadOnActiveConsumerTask = new Object();
private final RedeliveryTracker redeliveryTracker;
@@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
return;
}
- readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
- serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ synchronized (lockForReadOnActiveConsumerTask) {
+ if (readOnActiveConsumerTask != null) {
+ return;
}
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+ readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
+ serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ }
+ Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
- notifyActiveConsumerChanged(activeConsumer);
- readMoreEntries(activeConsumer);
- readOnActiveConsumerTask = null;
- }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+ notifyActiveConsumerChanged(activeConsumer);
+ readMoreEntries(activeConsumer);
+ readOnActiveConsumerTask = null;
+ }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+ }
}
@Override