You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/19 10:57:17 UTC

(pulsar) branch master updated: [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b1bd4b610d [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
1b1bd4b610d is described below

commit 1b1bd4b610dd768a6908964ef841a6790bb0f4f0
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 19 18:57:10 2024 +0800

    [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
---
 .../AbstractDispatcherSingleActiveConsumer.java    | 30 ++++++------
 ...onPersistentDispatcherSingleActiveConsumer.java |  4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  | 56 +++++++++++-----------
 3 files changed, 45 insertions(+), 45 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 7726eb814a0..9980b6ae97c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher {
 
     protected final String topicName;
-    protected static final AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
-            ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
-            AbstractDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
     private volatile Consumer activeConsumer = null;
     protected final CopyOnWriteArrayList<Consumer> consumers;
     protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
@@ -78,11 +74,16 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
         this.cursor = cursor;
-        ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
+    /**
+     * @apiNote this method does not need to be thread safe
+     */
     protected abstract void scheduleReadOnActiveConsumer();
 
+    /**
+     * @apiNote this method does not need to be thread safe
+     */
     protected abstract void cancelPendingRead();
 
     protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
@@ -99,6 +100,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
      * distributed partitions evenly across consumers with highest priority level.
      *
      * @return the true consumer if the consumer is changed, otherwise false.
+     * @apiNote this method is not thread safe
      */
     protected boolean pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
@@ -128,14 +130,14 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 ? partitionIndex % consumersSize
                 : peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
 
-        Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
+        Consumer selectedConsumer = consumers.get(index);
 
-        Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-        if (prevConsumer == activeConsumer) {
+        if (selectedConsumer == activeConsumer) {
             // Active consumer did not change. Do nothing at this point
             return false;
         } else {
             // If the active consumer is changed, send notification.
+            activeConsumer = selectedConsumer;
             scheduleReadOnActiveConsumer();
             return true;
         }
@@ -167,7 +169,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
         }
 
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
-            Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            Consumer actConsumer = getActiveConsumer();
             if (actConsumer != null) {
                 return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> {
                     if (actConsumerStillAlive.isEmpty() || actConsumerStillAlive.get()) {
@@ -210,7 +212,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
 
         if (!pickAndScheduleActiveConsumer()) {
             // the active consumer is not changed
-            Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            Consumer currentActiveConsumer = getActiveConsumer();
             if (null == currentActiveConsumer) {
                 if (log.isDebugEnabled()) {
                     log.debug("Current active consumer disappears while adding consumer {}", consumer);
@@ -230,7 +232,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
         }
 
         if (consumers.isEmpty()) {
-            ACTIVE_CONSUMER_UPDATER.set(this, null);
+            activeConsumer = null;
         }
 
         if (closeFuture == null && !consumers.isEmpty()) {
@@ -255,7 +257,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
      *            Calling consumer object
      */
     public synchronized boolean canUnsubscribe(Consumer consumer) {
-        return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
+        return (consumers.size() == 1) && Objects.equals(consumer, activeConsumer);
     }
 
     @Override
@@ -322,7 +324,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     }
 
     public Consumer getActiveConsumer() {
-        return ACTIVE_CONSUMER_UPDATER.get(this);
+        return activeConsumer;
     }
 
     @Override
@@ -331,7 +333,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     }
 
     public boolean isConsumerConnected() {
-        return ACTIVE_CONSUMER_UPDATER.get(this) != null;
+        return activeConsumer != null;
     }
 
     private static final Logger log = LoggerFactory.getLogger(AbstractDispatcherSingleActiveConsumer.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5e8eda2ab70..ec9b7ac40ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -53,7 +53,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
 
     @Override
     public void sendMessages(List<Entry> entries) {
-        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        Consumer currentConsumer = getActiveConsumer();
         if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -83,7 +83,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
 
     @Override
     public boolean hasPermits() {
-        return ACTIVE_CONSUMER_UPDATER.get(this) != null && ACTIVE_CONSUMER_UPDATER.get(this).getAvailablePermits() > 0;
+        return getActiveConsumer() != null && getActiveConsumer().getAvailablePermits() > 0;
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 637ede8a41f..a414848e105 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -73,8 +73,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
 
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff;
-    private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
-    private final Object lockForReadOnActiveConsumerTask = new Object();
+    private ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
 
@@ -109,7 +108,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Rewind cursor and read more entries without delay", name);
             }
-            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            Consumer activeConsumer = getActiveConsumer();
             cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
 
             notifyActiveConsumerChanged(activeConsumer);
@@ -124,23 +123,21 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             return;
         }
 
-        synchronized (lockForReadOnActiveConsumerTask) {
-            if (readOnActiveConsumerTask != null) {
-                return;
+        readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
+                        serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
             }
-            readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
-                            serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
-                }
-                Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+            Consumer activeConsumer = getActiveConsumer();
+            cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
 
-                notifyActiveConsumerChanged(activeConsumer);
-                readMoreEntries(activeConsumer);
+            notifyActiveConsumerChanged(activeConsumer);
+            readMoreEntries(activeConsumer);
+            synchronized (this) {
                 readOnActiveConsumerTask = null;
-            }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
-        }
+            }
+        }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+
     }
 
     @Override
@@ -184,7 +181,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
 
         readFailureBackoff.reduceToHalf();
 
-        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        Consumer currentConsumer = getActiveConsumer();
 
         if (isKeyHashRangeFiltered) {
             Iterator<Entry> iterator = entries.iterator();
@@ -241,12 +238,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                             sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the previous batch has been written to the socket.
-                    executor.execute(() -> {
-                            synchronized (PersistentDispatcherSingleActiveConsumer.this) {
-                                Consumer newConsumer = getActiveConsumer();
-                                readMoreEntries(newConsumer);
-                            }
-                        });
+                    executor.execute(() -> readMoreEntries(getActiveConsumer()));
                 }
             });
     }
@@ -262,7 +254,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name,
                         consumer);
             }
-        } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
+        } else if (getActiveConsumer() != consumer) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", name,
                         consumer);
@@ -295,7 +287,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             consumer.setConsumerEpoch(consumerEpoch);
         }
 
-        if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
+        if (consumer != getActiveConsumer()) {
             log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
                     name, consumer);
             return;
@@ -347,6 +339,12 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
 
         if (consumer.getAvailablePermits() > 0) {
             synchronized (this) {
+                final Consumer activeConsumer = getActiveConsumer();
+                if (consumer != activeConsumer) {
+                    log.info("[{}] cancel the readMoreEntries because consumer {} is no longer the active consumer {}",
+                            topic.getName(), consumer.consumerName(), activeConsumer.consumerName());
+                    return;
+                }
                 if (havePendingRead) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
@@ -404,7 +402,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             }
             topic.getBrokerService().executor().schedule(() -> {
                 isRescheduleReadInProgress.set(false);
-                Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                Consumer currentConsumer = getActiveConsumer();
                 readMoreEntries(currentConsumer);
             }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
         }
@@ -542,7 +540,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             // Jump again into dispatcher dedicated thread
             executor.execute(() -> {
                 synchronized (PersistentDispatcherSingleActiveConsumer.this) {
-                    Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                    Consumer currentConsumer = getActiveConsumer();
                     // we should retry the read if we have an active consumer and there is no pending read
                     if (currentConsumer != null && !havePendingRead) {
                         if (log.isDebugEnabled()) {
@@ -590,7 +588,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
 
     @Override
     public boolean checkAndUnblockIfStuck() {
-        Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        Consumer consumer = getActiveConsumer();
         if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) {
             return false;
         }