You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 01:43:04 UTC

[pulsar] branch master updated: Fix Duplicated messages are sent to dead letter topic #6960 (#7021)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6926180  Fix Duplicated messages are sent to dead letter topic #6960 (#7021)
6926180 is described below

commit 6926180966f45eb9c1499b7f0eb32ea2a1368fd6
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Fri Jun 5 09:42:52 2020 +0800

    Fix Duplicated messages are sent to dead letter topic #6960 (#7021)
    
    Fixes #6960
    
    
    ### Motivation
    
    bug fix
    
    ### Modifications
    
    "UnAckedMessageTracker" will call "consumerBase.redeliverUnacknowledgedMessages". There will be 2 steps here:
    1) Filter out the messages that need to enter the DLQ
    2) The remaining messages will be re-delivered via RedeliverUnacknowledgedMessages request
    
    The problem appeared in the second step, when all messages were filtered out in the first step. If the MessageIdsList received by the broker is empty, it will trigger the reposting of all unackedMessages under this consumer.
    
    Other consumers will consume messages that exceed maxRedeliveryCount again, and then the messages will repeatedly enter DLQ
    
    Therefore, if all messages have been filtered out, the RedeliverUnacknowledgedMessages request should not be initiated. Just let "processPossibleToDLQ" send them to DLQ and ack.
    
    ### Verifying this change
    unit test:
    org.apache.pulsar.client.api.DeadLetterTopicTest#testDuplicatedMessageSendToDeadLetterTopic
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 84 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  9 +--
 2 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index b1f1ddd..ddeb052 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -18,16 +18,23 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 
 public class DeadLetterTopicTest extends ProducerConsumerBase {
 
@@ -116,6 +123,77 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         newPulsarClient.close();
     }
 
+    @Test(timeOut = 30000)
+    public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
+        final int maxRedeliveryCount = 1;
+        final int messageCount = 10;
+        final int consumerCount = 3;
+        //1 start 3 parallel consumers
+        List<Consumer<String>> consumers = new ArrayList<>();
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        ExecutorService executor = Executors.newFixedThreadPool(consumerCount);
+        for (int i = 0; i < consumerCount; i++) {
+            executor.execute(() -> {
+                try {
+                    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName("my-subscription-DuplicatedMessage")
+                            .subscriptionType(SubscriptionType.Shared)
+                            .ackTimeout(1001, TimeUnit.MILLISECONDS)
+                            .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(topic + "-DLQ").build())
+                            .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
+                            .receiverQueueSize(100)
+                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                            .messageListener((MessageListener<String>) (consumer1, msg) -> {
+                                totalReceived.getAndIncrement();
+                                //never ack
+                            })
+                            .subscribe();
+                    consumers.add(consumer);
+                } catch (PulsarClientException e) {
+                    fail();
+                }
+            });
+        }
+
+        //2 send messages
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(String.format("Message [%d]", i));
+        }
+
+        //3 start a DLQ consumer
+        Consumer<String> deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic + "-DLQ")
+                .subscriptionName("my-subscription-DuplicatedMessage-DLQ")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        int totalInDeadLetter = 0;
+        while (true) {
+            Message<String> message = deadLetterConsumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        }
+
+        //4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1)
+        assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1));
+
+        //5 The message in DLQ should be equal to messageCount
+        assertEquals(totalInDeadLetter, messageCount);
+
+        //6 clean up
+        executor.shutdownNow();
+        producer.close();
+        deadLetterConsumer.close();
+        for (Consumer<String> consumer : consumers) {
+            consumer.close();
+        }
+    }
+
     /**
      * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}.
      * @throws Exception
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e36ce0c..e0f1bcb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1676,10 +1676,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             builder.setEntryId(messageId.getEntryId());
                             return builder.build();
                         }).collect(Collectors.toList());
-
-                ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
-                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                messageIdDatas.forEach(MessageIdData::recycle);
+                if (!messageIdDatas.isEmpty()) {
+                    ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    messageIdDatas.forEach(MessageIdData::recycle);
+                }
             });
             if (messagesFromQueue > 0) {
                 increaseAvailablePermits(cnx, messagesFromQueue);