You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/15 02:02:40 UTC

[pulsar] branch master updated: Fix the default retry letter and dead letter topic name (#10129)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86d0830  Fix the default retry letter and dead letter topic name  (#10129)
86d0830 is described below

commit 86d0830b7b173970424266fcf2f065f6df171020
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Sat May 15 10:01:54 2021 +0800

    Fix the default retry letter and dead letter topic name  (#10129)
    
    ### Motivation
    
    Fixes #9327
    
    ### Modifications
    
    Correct the default retry letter and dead letter topic name depend on full topic name
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 86 +++++++++++++++++++++-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 24 +++---
 2 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 3840efa..fc84a62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import lombok.Cleanup;
+import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -68,7 +69,85 @@ public class RetryTopicTest extends ProducerConsumerBase {
         @Cleanup
         PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
         Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+                .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
+    @Test
+    public void testRetryTopicNameForCompatibility () throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+
+        final String oldRetryTopic = "persistent://my-property/my-ns/my-subscription-RETRY";
+
+        final String oldDeadLetterTopic = "persistent://my-property/my-ns/my-subscription-DLQ";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 100;
+
+        admin.topics().createPartitionedTopic(oldRetryTopic, 2);
+        admin.topics().createPartitionedTopic(oldDeadLetterTopic, 2);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+                .topic(oldDeadLetterTopic)
                 .subscriptionName("my-subscription")
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -116,6 +195,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
         assertNull(checkMessage);
 
         checkConsumer.close();
+        newPulsarClient.close();
     }
 
     /**
@@ -145,7 +225,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
         // subscribe to the DLQ topics before consuming original topics
         Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+                .topic("persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -224,7 +304,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
         @Cleanup
         PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
         Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/my-subscription-DLQ")
+                .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 .subscribe();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 76b53ec..47f9991 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -28,9 +27,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
@@ -44,22 +43,18 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-import com.google.common.collect.Lists;
-import lombok.NonNull;
-
 @Getter(AccessLevel.PUBLIC)
 public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
@@ -121,8 +116,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         }
         if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
             TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
-            String retryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String deadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+
+            //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
+            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+            if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
+                retryLetterTopic = oldRetryLetterTopic;
+            }
+            if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
+                deadLetterTopic = oldDeadLetterTopic;
+            }
+
             if(conf.getDeadLetterPolicy() == null) {
                 conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
                                         .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)