You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:18 UTC

[pulsar] 04/14: Fix: consumer epoch may be inconsistent with the broker (#14716)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3e92301ad4d867abe1678e219b1819db95a53746
Author: Baozi <wu...@gmail.com>
AuthorDate: Thu Mar 17 14:41:59 2022 +0800

    Fix: consumer epoch may be inconsistent with the broker (#14716)
    
    ### Motivation
    
    Fixes: #14465
    
    In the current implementation [PIP-84](https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch), the  consumer epoch and broker epoch may be inconsistent. Unless reconnected, otherwise consumers will always filter message.
    
    When the consumer calls the `consumer.redeliverUnacknowledgedMessages()` method,  Consumer every time epoch + 1.
    But broker may be not increment. In the following code.
    
    https://github.com/apache/pulsar/blob/4f1e39b6921ea401b8c27f17a041d06d85f8abf8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L310-L322
    
    ### Modifications
    
    - When broker receives a message from consumer`redeliver message`,   every time epoch increment.
    - Add synchronized to readMoreEntries method.
    - Change log print.
    
    (cherry picked from commit ec81e90687e7c2282feed76cf9daac036614db44)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 33 +++++++++++++---------
 .../pulsar/broker/service/ResendRequestTest.java   |  7 +++--
 .../apache/pulsar/client/impl/ConsumerBase.java    |  4 +--
 3 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4932501..6a715df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -293,6 +293,15 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
+
+        if (consumerEpoch > consumer.getConsumerEpoch()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]",
+                        name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
+            }
+            consumer.setConsumerEpoch(consumerEpoch);
+        }
+
         if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
             log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
                     name, consumer);
@@ -308,9 +317,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         cancelPendingRead();
 
         if (!havePendingRead) {
-            if (consumerEpoch > consumer.getConsumerEpoch()) {
-                consumer.setConsumerEpoch(consumerEpoch);
-            }
             cursor.rewind();
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
@@ -320,7 +326,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
                     consumer);
         }
-
     }
 
     @Override
@@ -352,15 +357,17 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
             }
-            havePendingRead = true;
-            if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
-                        this, consumer);
-            } else {
-                ReadEntriesCtx readEntriesCtx =
-                        ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
-                cursor.asyncReadEntriesOrWait(messagesToRead,
-                        bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
+            synchronized (this) {
+                havePendingRead = true;
+                if (consumer.readCompacted()) {
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+                            this, consumer);
+                } else {
+                    ReadEntriesCtx readEntriesCtx =
+                            ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
+                    cursor.asyncReadEntriesOrWait(messagesToRead,
+                            bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
+                }
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 3a74286..ea0cd62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -726,12 +727,12 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @SuppressWarnings("unchecked")
     private BlockingQueue<Message<byte[]>> printIncomingMessageQueue(Consumer<byte[]> consumer) throws Exception {
-        BlockingQueue<Message<byte[]>> imq = null;
+        GrowableArrayBlockingQueue<Message<byte[]>> imq = null;
         ConsumerBase<byte[]> c = (ConsumerBase<byte[]>) consumer;
         Field field = ConsumerBase.class.getDeclaredField("incomingMessages");
         field.setAccessible(true);
-        imq = (BlockingQueue<Message<byte[]>>) field.get(c);
-        log.info("Incoming MEssage Queue: {}", imq);
+        imq = (GrowableArrayBlockingQueue<Message<byte[]>>) field.get(c);
+        log.info("Incoming MEssage Queue: {}", imq.toList());
         return imq;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 7f0575a..357da2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1073,8 +1073,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                 || getSubType() == CommandSubscribe.SubType.Exclusive)
                 && message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
                 && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
-            log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], consumerEpoch : [{}]",
-                    topic, message.getMessageId(), consumerEpoch);
+            log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+                    + "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
             message.release();
             message.recycle();
             return false;