You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/18 01:33:48 UTC

[pulsar] branch master updated: [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b2a9915c46 [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
1b2a9915c46 is described below

commit 1b2a9915c461c4357eb34d14c509963fb6cb47cd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Oct 18 09:33:42 2023 +0800

    [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
---
 .../test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java   | 4 +++-
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java     | 1 +
 .../src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java | 1 +
 3 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..7be292a6026 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -234,10 +234,11 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
 
         final int maxRedeliveryCount = 1;
         final int sendMessages = 10;
+        final String subscriptionName = "my-subscription";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
-                .subscriptionName("my-subscription")
+                .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -273,6 +274,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
             Message<byte[]> message = deadLetterConsumer.receive();
             //Original info should exists
             assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+            assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION), subscriptionName);
             assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
             deadLetterConsumer.acknowledge(message);
             totalInDeadLetter++;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ded6a546c24..f390b80a7f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -719,6 +719,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         //Compatible with the old version, will be deleted in the future
         propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
         propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+        propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION, subscription);
         return propertiesMap;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
index f73c2668779..e9071f171a2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
@@ -23,6 +23,7 @@ public class RetryMessageUtil {
     public static final String SYSTEM_PROPERTY_RECONSUMETIMES = "RECONSUMETIMES";
     public static final String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME";
     public static final String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+    public static final String SYSTEM_PROPERTY_REAL_SUBSCRIPTION = "REAL_SUBSCRIPTION";
     public static final String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
     @Deprecated
     public static final String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_IDY_TIME";