You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/19 10:57:17 UTC
(pulsar) branch master updated: [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b1bd4b610d [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
1b1bd4b610d is described below
commit 1b1bd4b610dd768a6908964ef841a6790bb0f4f0
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 19 18:57:10 2024 +0800
[improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
---
.../AbstractDispatcherSingleActiveConsumer.java | 30 ++++++------
...onPersistentDispatcherSingleActiveConsumer.java | 4 +-
.../PersistentDispatcherSingleActiveConsumer.java | 56 +++++++++++-----------
3 files changed, 45 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 7726eb814a0..9980b6ae97c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher {
protected final String topicName;
- protected static final AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
- ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
- AbstractDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
protected final CopyOnWriteArrayList<Consumer> consumers;
protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
@@ -78,11 +74,16 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
this.partitionIndex = partitionIndex;
this.subscriptionType = subscriptionType;
this.cursor = cursor;
- ACTIVE_CONSUMER_UPDATER.set(this, null);
}
+ /**
+ * @apiNote this method does not need to be thread safe
+ */
protected abstract void scheduleReadOnActiveConsumer();
+ /**
+ * @apiNote this method does not need to be thread safe
+ */
protected abstract void cancelPendingRead();
protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
@@ -99,6 +100,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
* distributed partitions evenly across consumers with highest priority level.
*
* @return the true consumer if the consumer is changed, otherwise false.
+ * @apiNote this method is not thread safe
*/
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());
@@ -128,14 +130,14 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
? partitionIndex % consumersSize
: peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
- Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
+ Consumer selectedConsumer = consumers.get(index);
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- if (prevConsumer == activeConsumer) {
+ if (selectedConsumer == activeConsumer) {
// Active consumer did not change. Do nothing at this point
return false;
} else {
// If the active consumer is changed, send notification.
+ activeConsumer = selectedConsumer;
scheduleReadOnActiveConsumer();
return true;
}
@@ -167,7 +169,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
- Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer actConsumer = getActiveConsumer();
if (actConsumer != null) {
return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> {
if (actConsumerStillAlive.isEmpty() || actConsumerStillAlive.get()) {
@@ -210,7 +212,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
- Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer currentActiveConsumer = getActiveConsumer();
if (null == currentActiveConsumer) {
if (log.isDebugEnabled()) {
log.debug("Current active consumer disappears while adding consumer {}", consumer);
@@ -230,7 +232,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
if (consumers.isEmpty()) {
- ACTIVE_CONSUMER_UPDATER.set(this, null);
+ activeConsumer = null;
}
if (closeFuture == null && !consumers.isEmpty()) {
@@ -255,7 +257,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
* Calling consumer object
*/
public synchronized boolean canUnsubscribe(Consumer consumer) {
- return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
+ return (consumers.size() == 1) && Objects.equals(consumer, activeConsumer);
}
@Override
@@ -322,7 +324,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
public Consumer getActiveConsumer() {
- return ACTIVE_CONSUMER_UPDATER.get(this);
+ return activeConsumer;
}
@Override
@@ -331,7 +333,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
public boolean isConsumerConnected() {
- return ACTIVE_CONSUMER_UPDATER.get(this) != null;
+ return activeConsumer != null;
}
private static final Logger log = LoggerFactory.getLogger(AbstractDispatcherSingleActiveConsumer.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5e8eda2ab70..ec9b7ac40ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -53,7 +53,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
@Override
public void sendMessages(List<Entry> entries) {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer currentConsumer = getActiveConsumer();
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -83,7 +83,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
@Override
public boolean hasPermits() {
- return ACTIVE_CONSUMER_UPDATER.get(this) != null && ACTIVE_CONSUMER_UPDATER.get(this).getAvailablePermits() > 0;
+ return getActiveConsumer() != null && getActiveConsumer().getAvailablePermits() > 0;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 637ede8a41f..a414848e105 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -73,8 +73,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff;
- private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
- private final Object lockForReadOnActiveConsumerTask = new Object();
+ private ScheduledFuture<?> readOnActiveConsumerTask = null;
private final RedeliveryTracker redeliveryTracker;
@@ -109,7 +108,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer activeConsumer = getActiveConsumer();
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
notifyActiveConsumerChanged(activeConsumer);
@@ -124,23 +123,21 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
return;
}
- synchronized (lockForReadOnActiveConsumerTask) {
- if (readOnActiveConsumerTask != null) {
- return;
+ readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
+ serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
- readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
- serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
- }
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+ Consumer activeConsumer = getActiveConsumer();
+ cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
- notifyActiveConsumerChanged(activeConsumer);
- readMoreEntries(activeConsumer);
+ notifyActiveConsumerChanged(activeConsumer);
+ readMoreEntries(activeConsumer);
+ synchronized (this) {
readOnActiveConsumerTask = null;
- }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
- }
+ }
+ }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
+
}
@Override
@@ -184,7 +181,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
readFailureBackoff.reduceToHalf();
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer currentConsumer = getActiveConsumer();
if (isKeyHashRangeFiltered) {
Iterator<Entry> iterator = entries.iterator();
@@ -241,12 +238,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the previous batch has been written to the socket.
- executor.execute(() -> {
- synchronized (PersistentDispatcherSingleActiveConsumer.this) {
- Consumer newConsumer = getActiveConsumer();
- readMoreEntries(newConsumer);
- }
- });
+ executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
});
}
@@ -262,7 +254,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name,
consumer);
}
- } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
+ } else if (getActiveConsumer() != consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", name,
consumer);
@@ -295,7 +287,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
consumer.setConsumerEpoch(consumerEpoch);
}
- if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
+ if (consumer != getActiveConsumer()) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
name, consumer);
return;
@@ -347,6 +339,12 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
if (consumer.getAvailablePermits() > 0) {
synchronized (this) {
+ final Consumer activeConsumer = getActiveConsumer();
+ if (consumer != activeConsumer) {
+ log.info("[{}] cancel the readMoreEntries because consumer {} is no longer the active consumer {}",
+ topic.getName(), consumer.consumerName(), activeConsumer.consumerName());
+ return;
+ }
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
@@ -404,7 +402,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
}
topic.getBrokerService().executor().schedule(() -> {
isRescheduleReadInProgress.set(false);
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer currentConsumer = getActiveConsumer();
readMoreEntries(currentConsumer);
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
@@ -542,7 +540,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
// Jump again into dispatcher dedicated thread
executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer currentConsumer = getActiveConsumer();
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
@@ -590,7 +588,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
@Override
public boolean checkAndUnblockIfStuck() {
- Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ Consumer consumer = getActiveConsumer();
if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) {
return false;
}