You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/15 02:02:40 UTC
[pulsar] branch master updated: Fix the default retry letter and
dead letter topic name (#10129)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86d0830 Fix the default retry letter and dead letter topic name (#10129)
86d0830 is described below
commit 86d0830b7b173970424266fcf2f065f6df171020
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Sat May 15 10:01:54 2021 +0800
Fix the default retry letter and dead letter topic name (#10129)
### Motivation
Fixes #9327
### Modifications
Correct the default retry letter and dead letter topic name depend on full topic name
---
.../apache/pulsar/client/api/RetryTopicTest.java | 86 +++++++++++++++++++++-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 24 +++---
2 files changed, 98 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 3840efa..fc84a62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import lombok.Cleanup;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -68,7 +69,85 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+ .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ for (int i = 0; i < sendMessages; i++) {
+ producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+ consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message message = deadLetterConsumer.receive();
+ log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ deadLetterConsumer.close();
+ consumer.close();
+
+ Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+ if (checkMessage != null) {
+ log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+ }
+ assertNull(checkMessage);
+
+ checkConsumer.close();
+ }
+
+ //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
+ @Test
+ public void testRetryTopicNameForCompatibility () throws Exception {
+ final String topic = "persistent://my-property/my-ns/retry-topic";
+
+ final String oldRetryTopic = "persistent://my-property/my-ns/my-subscription-RETRY";
+
+ final String oldDeadLetterTopic = "persistent://my-property/my-ns/my-subscription-DLQ";
+
+ final int maxRedeliveryCount = 2;
+
+ final int sendMessages = 100;
+
+ admin.topics().createPartitionedTopic(oldRetryTopic, 2);
+ admin.topics().createPartitionedTopic(oldDeadLetterTopic, 2);
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+ .topic(oldDeadLetterTopic)
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -116,6 +195,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
assertNull(checkMessage);
checkConsumer.close();
+ newPulsarClient.close();
}
/**
@@ -145,7 +225,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
// subscribe to the DLQ topics before consuming original topics
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+ .topic("persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -224,7 +304,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+ .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscribe();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 76b53ec..47f9991 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -28,9 +27,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
@@ -44,22 +43,18 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
-import com.google.common.collect.Lists;
-import lombok.NonNull;
-
@Getter(AccessLevel.PUBLIC)
public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
@@ -121,8 +116,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
- String retryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
- String deadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+ String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+
+ //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
+ String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+ String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
+ retryLetterTopic = oldRetryLetterTopic;
+ }
+ if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
+ deadLetterTopic = oldDeadLetterTopic;
+ }
+
if(conf.getDeadLetterPolicy() == null) {
conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)