You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/18 09:51:20 UTC

[pulsar-client-go] branch master updated: Default retry and dlq topic name fixed as per the doc (#891)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d92fb14  Default retry and dlq topic name fixed as per the doc (#891)
d92fb14 is described below

commit d92fb1407d3d39c8a498dd7c7abdc0bbb3fc7e1a
Author: Nitin Goyal <ni...@gmail.com>
AuthorDate: Sun Dec 18 15:21:16 2022 +0530

    Default retry and dlq topic name fixed as per the doc (#891)
    
    * default retry and dlq name fixed based on doc
    
    default retry and dlq name fixed based on doc
    
    * backword compatibility fixed
    
    Signed-off-by: Nitin Goyal <ni...@gmail.com>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <ni...@gmail.com>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <ni...@gmail.com>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <ni...@gmail.com>
    
    * Lint fixed
    
    * Fix Retry topic when topic is partitioned
    
    * Fix Retry topic when topic is partitioned
    
    * RETRY and DLQ topic name bug fixed for partition topics
    
    * RETRY and DLQ topic name bug fixed for partition topics
    
    * Bug fix
    
    Signed-off-by: Nitin Goyal <ni...@gmail.com>
---
 pulsar/consumer_impl.go | 22 ++++++++++++++++++++--
 pulsar/consumer_test.go |  6 +++---
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 707c13c..e5290a1 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -122,8 +122,26 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 			return nil, err
 		}
 
-		retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
-		dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+		topicName := internal.TopicNameWithoutPartitionPart(tn)
+
+		retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
+		dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix
+
+		oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
+		oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+
+		if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
+			r != nil &&
+			r.Partitions > 0 {
+			retryTopic = oldRetryTopic
+		}
+
+		if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
+			r != nil &&
+			r.Partitions > 0 {
+			dlqTopic = oldDlqTopic
+		}
+
 		if options.DLQ == nil {
 			options.DLQ = &DLQPolicy{
 				MaxDeliveries:    MaxReconsumeTimes,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 7a7f118..355e33b 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1498,7 +1498,7 @@ func TestRLQ(t *testing.T) {
 
 	// 3. Create consumer on the DLQ topic to verify the routing
 	dlqConsumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                       "persistent://public/default/" + subName + "-DLQ",
+		Topic:                       "persistent://public/default/" + topic + "-" + subName + "-DLQ",
 		SubscriptionName:            subName,
 		SubscriptionInitialPosition: SubscriptionPositionEarliest,
 	})
@@ -1603,7 +1603,7 @@ func TestRLQMultiTopics(t *testing.T) {
 
 	// subscribe DLQ Topic
 	dlqConsumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                       subName + "-DLQ",
+		Topic:                       topics[0] + "-" + subName + "-DLQ",
 		SubscriptionName:            subName,
 		SubscriptionInitialPosition: SubscriptionPositionEarliest,
 	})
@@ -1710,7 +1710,7 @@ func TestRLQSpecifiedPartitionTopic(t *testing.T) {
 
 	// subscribe DLQ Topic
 	dlqConsumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                       subName + "-DLQ",
+		Topic:                       normalTopic + "-" + subName + "-DLQ",
 		SubscriptionName:            subName,
 		SubscriptionInitialPosition: SubscriptionPositionEarliest,
 	})