You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/18 01:33:48 UTC
[pulsar] branch master updated: [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b2a9915c46 [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
1b2a9915c46 is described below
commit 1b2a9915c461c4357eb34d14c509963fb6cb47cd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Oct 18 09:33:42 2023 +0800
[improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
---
.../test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java | 4 +++-
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java | 1 +
3 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..7be292a6026 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -234,10 +234,11 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
final int maxRedeliveryCount = 1;
final int sendMessages = 10;
+ final String subscriptionName = "my-subscription";
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -273,6 +274,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
Message<byte[]> message = deadLetterConsumer.receive();
//Original info should exists
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+ assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION), subscriptionName);
assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ded6a546c24..f390b80a7f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -719,6 +719,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
//Compatible with the old version, will be deleted in the future
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+ propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION, subscription);
return propertiesMap;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
index f73c2668779..e9071f171a2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
@@ -23,6 +23,7 @@ public class RetryMessageUtil {
public static final String SYSTEM_PROPERTY_RECONSUMETIMES = "RECONSUMETIMES";
public static final String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME";
public static final String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+ public static final String SYSTEM_PROPERTY_REAL_SUBSCRIPTION = "REAL_SUBSCRIPTION";
public static final String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
@Deprecated
public static final String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_IDY_TIME";