You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:18 UTC
[pulsar] 04/14: Fix: consumer epoch may be inconsistent with the broker (#14716)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3e92301ad4d867abe1678e219b1819db95a53746
Author: Baozi <wu...@gmail.com>
AuthorDate: Thu Mar 17 14:41:59 2022 +0800
Fix: consumer epoch may be inconsistent with the broker (#14716)
### Motivation
Fixes: #14465
In the current implementation [PIP-84](https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch), the consumer epoch and broker epoch may be inconsistent. Unless reconnected, otherwise consumers will always filter message.
When the consumer calls the `consumer.redeliverUnacknowledgedMessages()` method, Consumer every time epoch + 1.
But broker may be not increment. In the following code.
https://github.com/apache/pulsar/blob/4f1e39b6921ea401b8c27f17a041d06d85f8abf8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L310-L322
### Modifications
- When broker receives a message from consumer`redeliver message`, every time epoch increment.
- Add synchronized to readMoreEntries method.
- Change log print.
(cherry picked from commit ec81e90687e7c2282feed76cf9daac036614db44)
---
.../PersistentDispatcherSingleActiveConsumer.java | 33 +++++++++++++---------
.../pulsar/broker/service/ResendRequestTest.java | 7 +++--
.../apache/pulsar/client/impl/ConsumerBase.java | 4 +--
3 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4932501..6a715df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -293,6 +293,15 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
+
+ if (consumerEpoch > consumer.getConsumerEpoch()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
+ }
+ consumer.setConsumerEpoch(consumerEpoch);
+ }
+
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
name, consumer);
@@ -308,9 +317,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
cancelPendingRead();
if (!havePendingRead) {
- if (consumerEpoch > consumer.getConsumerEpoch()) {
- consumer.setConsumerEpoch(consumerEpoch);
- }
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
@@ -320,7 +326,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
consumer);
}
-
}
@Override
@@ -352,15 +357,17 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
- havePendingRead = true;
- if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
- this, consumer);
- } else {
- ReadEntriesCtx readEntriesCtx =
- ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
- cursor.asyncReadEntriesOrWait(messagesToRead,
- bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
+ synchronized (this) {
+ havePendingRead = true;
+ if (consumer.readCompacted()) {
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+ this, consumer);
+ } else {
+ ReadEntriesCtx readEntriesCtx =
+ ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
+ cursor.asyncReadEntriesOrWait(messagesToRead,
+ bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
+ }
}
} else {
if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 3a74286..ea0cd62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -726,12 +727,12 @@ public class ResendRequestTest extends BrokerTestBase {
@SuppressWarnings("unchecked")
private BlockingQueue<Message<byte[]>> printIncomingMessageQueue(Consumer<byte[]> consumer) throws Exception {
- BlockingQueue<Message<byte[]>> imq = null;
+ GrowableArrayBlockingQueue<Message<byte[]>> imq = null;
ConsumerBase<byte[]> c = (ConsumerBase<byte[]>) consumer;
Field field = ConsumerBase.class.getDeclaredField("incomingMessages");
field.setAccessible(true);
- imq = (BlockingQueue<Message<byte[]>>) field.get(c);
- log.info("Incoming MEssage Queue: {}", imq);
+ imq = (GrowableArrayBlockingQueue<Message<byte[]>>) field.get(c);
+ log.info("Incoming MEssage Queue: {}", imq.toList());
return imq;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7f0575a..357da2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1073,8 +1073,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
- log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], consumerEpoch : [{}]",
- topic, message.getMessageId(), consumerEpoch);
+ log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ + "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
return false;