You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/08/28 12:18:55 UTC

[pulsar] 01/02: [fix][client] Fix reach redeliverCount client can't send messages to DLQ (#17287)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb77b522ba0931ed22b32be5999ea68ad6337f2f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Aug 27 12:38:08 2022 +0800

    [fix][client] Fix reach redeliverCount client can't send messages to DLQ (#17287)
    
    (cherry picked from commit 4a28c087fe1308ea4eabc104b3d4889b47316afe)
---
 .../client/impl/TransactionEndToEndTest.java       | 55 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 13 +++--
 2 files changed, 64 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 72eef04bad9..e2683afa2df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -67,6 +68,7 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1169,4 +1171,57 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             assertTrue(ex instanceof PulsarClientException.TimeoutException);
         }
     }
+
+    @Test
+    public void testSendTxnAckMessageToDLQ() throws Exception {
+        String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
+        String subName = "test";
+        String value = "test";
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                // consumer can't receive the same message three times
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
+                .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+                        topic, subName))
+                .subscriptionType(SubscriptionType.Shared)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscriptionName("test")
+                .subscribe();
+
+        producer.send(value.getBytes());
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
+                .build().get();
+
+        // consumer receive the message the first time, redeliverCount = 0
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+        transaction.abort().get();
+
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build().get();
+
+        // consumer receive the message the second time, redeliverCount = 1, also can be received
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+        transaction.abort().get();
+
+        // consumer receive the message the third time, redeliverCount = 2,
+        // the message will be sent to DLQ, can't receive
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+        assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4cb334b38e4..809e7c674b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1386,10 +1386,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             schema, redeliveryCount, consumerEpoch);
             uncompressedPayload.release();
 
-            if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null
-                    && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
-                possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
-                        Collections.singletonList(message));
+            if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
+                if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+                            Collections.singletonList(message));
+                    if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
+                        redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
+                        return;
+                    }
+                }
             }
             executeNotifyCallback(message);
         } else {