You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/18 09:51:20 UTC
[pulsar-client-go] branch master updated: Default retry and dlq topic name fixed as per the doc (#891)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d92fb14 Default retry and dlq topic name fixed as per the doc (#891)
d92fb14 is described below
commit d92fb1407d3d39c8a498dd7c7abdc0bbb3fc7e1a
Author: Nitin Goyal <ni...@gmail.com>
AuthorDate: Sun Dec 18 15:21:16 2022 +0530
Default retry and dlq topic name fixed as per the doc (#891)
* default retry and dlq name fixed based on doc
default retry and dlq name fixed based on doc
* backword compatibility fixed
Signed-off-by: Nitin Goyal <ni...@gmail.com>
* testcase fixed
Signed-off-by: Nitin Goyal <ni...@gmail.com>
* testcase fixed
Signed-off-by: Nitin Goyal <ni...@gmail.com>
* testcase fixed
Signed-off-by: Nitin Goyal <ni...@gmail.com>
* Lint fixed
* Fix Retry topic when topic is partitioned
* Fix Retry topic when topic is partitioned
* RETRY and DLQ topic name bug fixed for partition topics
* RETRY and DLQ topic name bug fixed for partition topics
* Bug fix
Signed-off-by: Nitin Goyal <ni...@gmail.com>
---
pulsar/consumer_impl.go | 22 ++++++++++++++++++++--
pulsar/consumer_test.go | 6 +++---
2 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 707c13c..e5290a1 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -122,8 +122,26 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}
- retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
- dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+ topicName := internal.TopicNameWithoutPartitionPart(tn)
+
+ retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
+ dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix
+
+ oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
+ oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+
+ if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
+ r != nil &&
+ r.Partitions > 0 {
+ retryTopic = oldRetryTopic
+ }
+
+ if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
+ r != nil &&
+ r.Partitions > 0 {
+ dlqTopic = oldDlqTopic
+ }
+
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 7a7f118..355e33b 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1498,7 +1498,7 @@ func TestRLQ(t *testing.T) {
// 3. Create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
- Topic: "persistent://public/default/" + subName + "-DLQ",
+ Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
@@ -1603,7 +1603,7 @@ func TestRLQMultiTopics(t *testing.T) {
// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
- Topic: subName + "-DLQ",
+ Topic: topics[0] + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
@@ -1710,7 +1710,7 @@ func TestRLQSpecifiedPartitionTopic(t *testing.T) {
// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
- Topic: subName + "-DLQ",
+ Topic: normalTopic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})