You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/11/21 10:17:30 UTC
(pulsar) branch master updated: [improve] [client] Add producerName for deadLetterProducer (#21589)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c87cfb3f50b [improve] [client] Add producerName for deadLetterProducer (#21589)
c87cfb3f50b is described below
commit c87cfb3f50b32f5c032b5d1c3a6a0b91cde2bbe9
Author: crossoverJie <cr...@gmail.com>
AuthorDate: Tue Nov 21 18:17:23 2023 +0800
[improve] [client] Add producerName for deadLetterProducer (#21589)
Fixes #21441
Related PR: https://github.com/apache/pulsar/pull/21507
### Motivation
Add producerName for dead letter producer, easier to locate problems.
### Modifications
```java
.producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
```
When creating a `deadLetterProducer`, specify the `producerName` to replace the randomly generated name.
---
.../pulsar/client/api/DeadLetterTopicTest.java | 59 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 +
2 files changed, 60 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 7be292a6026..ea93ece549e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -137,6 +137,65 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
consumer.close();
}
+ public void testDeadLetterTopicWithProducerName() throws Exception {
+ final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+ final String subscription = "my-subscription";
+ String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription);
+
+ final int maxRedeliveryCount = 1;
+
+ final int sendMessages = 100;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+ .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ for (int i = 0; i < sendMessages; i++) {
+ producer.newMessage()
+ .value(String.format("Hello Pulsar [%d]", i).getBytes())
+ .send();
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message message = deadLetterConsumer.receive();
+ assertEquals(message.getProducerName(), deadLetterProducerName);
+ log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ deadLetterConsumer.close();
+ consumer.close();
+ }
+
@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 85d6c5668d5..fbc2a8c285d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2171,6 +2171,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
+ .producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)