You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:21 UTC

[pulsar] 19/26: [fix][broker] Fix rewind failed when ``redeliverUnacknowledgedMessages`` (#15046)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 944d9b8a45b47385e130aa768d0eb5eaa19a5841
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:01:11 2022 +0800

    [fix][broker] Fix rewind failed when ``redeliverUnacknowledgedMessages`` (#15046)
    
    According to many PRs flaky-test ``SimpleProducerConsumerTest#testRedeliveryFailOverConsumer``, the broker logs as below.
    
    ```
    2022-04-07T11:19:16,561+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/0 epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:0:-1  epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/1 epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:1:-1  epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/2 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/3 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/4 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/5 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/6 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/7 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/8 epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:2:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:3:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:4:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive and ack message 3/0 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive and ack message 3/1 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive and ack message 3/2 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive and ack message 3/3 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Trigger unack messages redeliver and clear receive queue----
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:5:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:6:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:7:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:8:-1  epoch 0
    2022-04-07T11:19:16,578+0800 [broker-topic-workers-OrderedScheduler-8-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/unacked-topic / subscriber-1-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/unacked-topic, name=subscriber-1}, consumerId=0, consumerName=dde4a, address=/127.0.0.1:49153}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed
    2022-04-07T11:19:16,578+0800 [pulsar-io-308-1] INFO  org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to consumer, message id 3/9 epoch 0
    2022-04-07T11:19:16,578+0800 [pulsar-client-io-336-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, message id 3:9:-1  epoch 0
    2022-04-07T11:19:17,580+0800 [main] WARN  org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:5:-1:0], messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:6:-1:0], messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:7:-1:0], messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:8:-1:0], messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:9:-1:0], messageConsumerEpoch : [0], consumerEpoch : [1]
    ```
    
    In normal logic, the consumer will continue to receive messages from the broker. When the consumer triggers `redeliverUnacknowledgedMessages`, it will immediately increase the consumer epoch, and the request will rewind the broker cursor to trigger the broker to redeliver these messages with the new epoch.
    And then, the consumer will filter old messages by epoch.
    
    But in this case, we will find the following abnormal log, which means that the cursor pending read has been cancelled(cursor.cancelPendingReadRequest() return false), and then havePendingRead is not set to false. causing no cursor rewind at the end and didn't trigger the Dispatcher to re-push the messages.
    
    > Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed
    
    Due to this abnormal behaviour causing the consumer will never receive redelivered messages with epoch 1 as below.
    
    ```
    message id 3:4:-1  epoch 1
    message id 3:5:-1  epoch 1
    message id 3:6:-1  epoch 1
    message id 3:7:-1  epoch 1
    message id 3:8:-1  epoch 1
    message id 3:9:-1  epoch 1
    ```
    
    Relative code:
    
    https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L317-L328
    
    https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L136-L141
    
    https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L847-L851
    
    -  Force to rewind cursor when `redeliverUnacknowledgedMessages`
    
    (cherry picked from commit 22d6beb15b9d44e630e46931f5905d19f33fe724)
---
 .../PersistentDispatcherSingleActiveConsumer.java     | 19 ++++++-------------
 .../pulsar/client/api/SimpleProducerConsumerTest.java |  6 +++---
 2 files changed, 9 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6900a54e35c..8d0adcf0ddb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -295,20 +295,13 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                     name, consumer);
             return;
         }
-
-        cancelPendingRead();
-
-        if (!havePendingRead) {
-            cursor.rewind();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
-            }
-            readMoreEntries(consumer);
-        } else {
-            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
-                    consumer);
+        cursor.cancelPendingReadRequest();
+        havePendingRead = false;
+        cursor.rewind();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
         }
-
+        readMoreEntries(consumer);
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f5677d36273..8a47c6fbb62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2441,7 +2441,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Message<byte[]> msg;
         List<Message<byte[]>> messages1 = Lists.newArrayList();
         for (int i = 0; i < consumeMsgInParts; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);
@@ -2457,7 +2457,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         // (1.b) consume second consumeMsgInParts msgs and trigger redeliver
         messages1.clear();
         for (int i = 0; i < consumeMsgInParts; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);
@@ -2480,7 +2480,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts);
         messages1.clear();
         for (int i = 0; i < remainingMsgs; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);