You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/02/22 08:23:35 UTC

[pulsar] branch master updated: Add original info when publishing message to dead letter topic (#9655)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b607bba  Add original info when publishing message to dead letter topic (#9655)
b607bba is described below

commit b607bba285499c07e56a782fe954eded32526724
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Feb 22 16:23:12 2021 +0800

    Add original info when publishing message to dead letter topic (#9655)
    
    Fixes #9543
    
    
    ### Motivation
    When message is produced to dead letter topic, its origin related information like original message-id and topic is lost. This info is useful when debugging failures and correlating failure logs with messages in dead letter topic.
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 58 +++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 75 +++++++++++++---------
 2 files changed, 104 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 1bc7cfc..1878ad6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -19,12 +19,15 @@
 package org.apache.pulsar.client.api;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -34,6 +37,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -123,6 +127,60 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         newPulsarClient.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+        final int sendMessages = 10;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        Set<String> messageIds = new HashSet<>();
+        for (int i = 0; i < sendMessages; i++) {
+            MessageId messageId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+            messageIds.add(messageId.toString());
+        }
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            consumer.receive();
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message<byte[]> message = deadLetterConsumer.receive();
+            //Original info should exists
+            assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+            assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+        assertEquals(totalInDeadLetter, sendMessages);
+        deadLetterConsumer.close();
+        consumer.close();
+        newPulsarClient.close();
+    }
+
     @Test(timeOut = 30000)
     public void testDLQDisabledForKeySharedSubtype() throws Exception {
         final String topic = "persistent://my-property/my-ns/dead-letter-topic";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 59f296d..94f73dc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -583,47 +583,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         CompletableFuture<Void> result = new CompletableFuture<>();
         if (retryLetterProducer != null) {
             try {
-                MessageImpl<T> retryMessage = null;
-                String originMessageIdStr = null;
-                String originTopicNameStr = null;
-                if (message instanceof TopicMessageImpl) {
-                    retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) message).getMessage();
-                    originMessageIdStr = ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
-                    originTopicNameStr = ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
-                } else if (message instanceof MessageImpl) {
-                    retryMessage = (MessageImpl<T>) message;
-                    originMessageIdStr = ((MessageImpl<T>) message).getMessageId().toString();
-                    originTopicNameStr =  ((MessageImpl<T>) message).getTopicName();
-                }
-                SortedMap<String, String> propertiesMap = new TreeMap<>();
+                MessageImpl<T> retryMessage = (MessageImpl<T>) getMessageImpl(message);
+                String originMessageIdStr = getOriginMessageIdStr(message);
+                String originTopicNameStr = getOriginTopicNameStr(message);
+                SortedMap<String, String> propertiesMap
+                        = getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
                 int reconsumetimes = 1;
-                if (message.getProperties() != null) {
-                    propertiesMap.putAll(message.getProperties());
-                }
-
                 if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                     reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
                     reconsumetimes = reconsumetimes + 1;
-
-                } else {
-                    propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
-                    propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
                 }
-
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
 
                 if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
                     MessageId finalMessageId = messageId;
-                    String finalOriginTopicNameStr = originTopicNameStr;
-                    String finalOriginMessageIdStr = originMessageIdStr;
-                    MessageImpl<T> finalRetryMessage = retryMessage;
                     deadLetterProducer.thenAccept(dlqProducer -> {
-                        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, finalOriginTopicNameStr);
-                        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, finalOriginMessageIdStr);
                         TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage()
-                                .value(finalRetryMessage.getValue())
+                                .value(retryMessage.getValue())
                                 .properties(propertiesMap);
                         typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
                             doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
@@ -671,6 +649,43 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return result;
     }
 
+    private SortedMap<String, String> getPropertiesMap(Message<?> message, String originMessageIdStr, String originTopicNameStr) {
+        SortedMap<String, String> propertiesMap = new TreeMap<>();
+        if (message.getProperties() != null) {
+            propertiesMap.putAll(message.getProperties());
+        }
+        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
+        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+        return propertiesMap;
+    }
+
+    private String getOriginMessageIdStr(Message<?> message) {
+        if (message instanceof TopicMessageImpl) {
+            return  ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
+        } else if (message instanceof MessageImpl) {
+            return message.getMessageId().toString();
+        }
+        return null;
+    }
+
+    private String getOriginTopicNameStr(Message<?> message) {
+        if (message instanceof TopicMessageImpl) {
+            return  ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
+        } else if (message instanceof MessageImpl) {
+            return message.getTopicName();
+        }
+        return null;
+    }
+
+    private MessageImpl<?> getMessageImpl(Message<?> message){
+        if (message instanceof TopicMessageImpl) {
+            return (MessageImpl<?>) ((TopicMessageImpl<?>) message).getMessage();
+        } else if (message instanceof MessageImpl) {
+            return (MessageImpl<?>) message;
+        }
+        return null;
+    }
+
     @Override
     public void negativeAcknowledge(MessageId messageId) {
         negativeAcksTracker.add(messageId);
@@ -1676,9 +1691,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             MessageIdImpl finalMessageId = messageId;
             deadLetterProducer.thenAccept(producerDLQ -> {
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
+                    String originMessageIdStr = getOriginMessageIdStr(message);
+                    String originTopicNameStr = getOriginTopicNameStr(message);
                     producerDLQ.newMessage()
                             .value(message.getValue())
-                            .properties(message.getProperties())
+                            .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr))
                             .sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 possibleSendToDeadLetterTopicMessages.remove(finalMessageId);