You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/11/21 10:17:30 UTC

(pulsar) branch master updated: [improve] [client] Add producerName for deadLetterProducer (#21589)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c87cfb3f50b [improve] [client] Add producerName for deadLetterProducer (#21589)
c87cfb3f50b is described below

commit c87cfb3f50b32f5c032b5d1c3a6a0b91cde2bbe9
Author: crossoverJie <cr...@gmail.com>
AuthorDate: Tue Nov 21 18:17:23 2023 +0800

    [improve] [client] Add producerName for deadLetterProducer (#21589)
    
    Fixes #21441
    Related PR: https://github.com/apache/pulsar/pull/21507
    
    ### Motivation
    
    Add producerName for dead letter producer, easier to locate problems.
    
    ### Modifications
    
    ```java
    .producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
    ```
    
    When creating a `deadLetterProducer`, specify the `producerName` to replace the randomly generated name.
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 59 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 +
 2 files changed, 60 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 7be292a6026..ea93ece549e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -137,6 +137,65 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         consumer.close();
     }
 
+    public void testDeadLetterTopicWithProducerName() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+        final String subscription = "my-subscription";
+        String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription);
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            assertEquals(message.getProducerName(), deadLetterProducerName);
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
     @DataProvider(name = "produceLargeMessages")
     public Object[][] produceLargeMessages() {
         return new Object[][] { { false }, { true } };
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 85d6c5668d5..fbc2a8c285d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2171,6 +2171,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             ((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
                                     .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
                                     .topic(this.deadLetterPolicy.getDeadLetterTopic())
+                                    .producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
                                     .blockIfQueueFull(false)
                                     .enableBatching(false)
                                     .enableChunking(true)