You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/02/22 08:23:35 UTC
[pulsar] branch master updated: Add original info when publishing
message to dead letter topic (#9655)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b607bba Add original info when publishing message to dead letter topic (#9655)
b607bba is described below
commit b607bba285499c07e56a782fe954eded32526724
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Feb 22 16:23:12 2021 +0800
Add original info when publishing message to dead letter topic (#9655)
Fixes #9543
### Motivation
When message is produced to dead letter topic, its origin related information like original message-id and topic is lost. This info is useful when debugging failures and correlating failure logs with messages in dead letter topic.
---
.../pulsar/client/api/DeadLetterTopicTest.java | 58 +++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 75 +++++++++++++---------
2 files changed, 104 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 1bc7cfc..1878ad6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -19,12 +19,15 @@
package org.apache.pulsar.client.api;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.client.util.RetryMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -34,6 +37,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -123,6 +127,60 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
newPulsarClient.close();
}
+ @Test(timeOut = 20000)
+ public void testDeadLetterTopicHasOriginalInfo() throws Exception {
+ final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+ final int maxRedeliveryCount = 1;
+ final int sendMessages = 10;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+ .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+ Set<String> messageIds = new HashSet<>();
+ for (int i = 0; i < sendMessages; i++) {
+ MessageId messageId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+ messageIds.add(messageId.toString());
+ }
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ consumer.receive();
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message<byte[]> message = deadLetterConsumer.receive();
+ //Original info should exists
+ assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+ assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+ assertEquals(totalInDeadLetter, sendMessages);
+ deadLetterConsumer.close();
+ consumer.close();
+ newPulsarClient.close();
+ }
+
@Test(timeOut = 30000)
public void testDLQDisabledForKeySharedSubtype() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 59f296d..94f73dc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -583,47 +583,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
- MessageImpl<T> retryMessage = null;
- String originMessageIdStr = null;
- String originTopicNameStr = null;
- if (message instanceof TopicMessageImpl) {
- retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) message).getMessage();
- originMessageIdStr = ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
- originTopicNameStr = ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
- } else if (message instanceof MessageImpl) {
- retryMessage = (MessageImpl<T>) message;
- originMessageIdStr = ((MessageImpl<T>) message).getMessageId().toString();
- originTopicNameStr = ((MessageImpl<T>) message).getTopicName();
- }
- SortedMap<String, String> propertiesMap = new TreeMap<>();
+ MessageImpl<T> retryMessage = (MessageImpl<T>) getMessageImpl(message);
+ String originMessageIdStr = getOriginMessageIdStr(message);
+ String originTopicNameStr = getOriginTopicNameStr(message);
+ SortedMap<String, String> propertiesMap
+ = getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
int reconsumetimes = 1;
- if (message.getProperties() != null) {
- propertiesMap.putAll(message.getProperties());
- }
-
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;
-
- } else {
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
}
-
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
MessageId finalMessageId = messageId;
- String finalOriginTopicNameStr = originTopicNameStr;
- String finalOriginMessageIdStr = originMessageIdStr;
- MessageImpl<T> finalRetryMessage = retryMessage;
deadLetterProducer.thenAccept(dlqProducer -> {
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, finalOriginTopicNameStr);
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, finalOriginMessageIdStr);
TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage()
- .value(finalRetryMessage.getValue())
+ .value(retryMessage.getValue())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
@@ -671,6 +649,43 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return result;
}
+ private SortedMap<String, String> getPropertiesMap(Message<?> message, String originMessageIdStr, String originTopicNameStr) {
+ SortedMap<String, String> propertiesMap = new TreeMap<>();
+ if (message.getProperties() != null) {
+ propertiesMap.putAll(message.getProperties());
+ }
+ propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
+ propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+ return propertiesMap;
+ }
+
+ private String getOriginMessageIdStr(Message<?> message) {
+ if (message instanceof TopicMessageImpl) {
+ return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
+ } else if (message instanceof MessageImpl) {
+ return message.getMessageId().toString();
+ }
+ return null;
+ }
+
+ private String getOriginTopicNameStr(Message<?> message) {
+ if (message instanceof TopicMessageImpl) {
+ return ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
+ } else if (message instanceof MessageImpl) {
+ return message.getTopicName();
+ }
+ return null;
+ }
+
+ private MessageImpl<?> getMessageImpl(Message<?> message){
+ if (message instanceof TopicMessageImpl) {
+ return (MessageImpl<?>) ((TopicMessageImpl<?>) message).getMessage();
+ } else if (message instanceof MessageImpl) {
+ return (MessageImpl<?>) message;
+ }
+ return null;
+ }
+
@Override
public void negativeAcknowledge(MessageId messageId) {
negativeAcksTracker.add(messageId);
@@ -1676,9 +1691,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
MessageIdImpl finalMessageId = messageId;
deadLetterProducer.thenAccept(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
+ String originMessageIdStr = getOriginMessageIdStr(message);
+ String originTopicNameStr = getOriginTopicNameStr(message);
producerDLQ.newMessage()
.value(message.getValue())
- .properties(message.getProperties())
+ .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr))
.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);