You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/11/05 08:55:23 UTC

[pulsar-client-go] branch master updated: Fix reconsume broken while using non-FQDN topics (#386)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8465c55  Fix reconsume broken while using non-FQDN topics (#386)
8465c55 is described below

commit 8465c55036eb715663aeca95277106255fee2842
Author: wuYin <wu...@gmail.com>
AuthorDate: Thu Nov 5 16:47:20 2020 +0800

    Fix reconsume broken while using non-FQDN topics (#386)
    
    ### Issue
    Retry policy not effective with non-FQDN topic.
    
    - reproduction
    	```go
    	client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
    	consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
    		Topic:            "topic-01",
    		SubscriptionName: "my-sub",
    		RetryEnable:      true,
    		DLQ:              &pulsar.DLQPolicy{MaxDeliveries: 2},
    	})
    	msg, _ := consumer.Receive(context.Background())
    	consumer.ReconsumeLater(msg, 5*time.Second)
    	```
    - logs
    
    	```
    	RN[0000] consumer of topic [persistent://public/default/topic-01] not exist unexpectedly  topic="[topic-01 persistent://public/default/my-sub-RETRY]"
    	```
    
    ### Cause
    For MultiTopicConsumer `consumers` map filed:
    - key: user provided topic, maybe non-FQDN.
    - value: consumer instance.
    
    `ReconsumeLater` using msg's FQDN topic as key to find `consumer` in `consumers`,
     if mismatch with non-FQDN topic, this invoke will be ignored, lead to Retry policy not effective.
    
    ### Modifications
    - Normalize user provided topics as FQDN topics before initializing consumers.
    - Add non-FQDN topic consumption case in Retry policy tests.
    
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_impl.go | 11 ++++++++---
 pulsar/consumer_test.go |  6 +++---
 pulsar/helper.go        | 15 ++++++++-------
 3 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index dceb1e2..8ae9161 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -160,6 +160,8 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		return nil, err
 	}
 
+	// normalize as FQDN topics
+	var tns []*internal.TopicName
 	// single topic consumer
 	if options.Topic != "" || len(options.Topics) == 1 {
 		topic := options.Topic
@@ -167,17 +169,20 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 			topic = options.Topics[0]
 		}
 
-		if err := validateTopicNames(topic); err != nil {
+		if tns, err = validateTopicNames(topic); err != nil {
 			return nil, err
 		}
-
+		topic = tns[0].Name
 		return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
 	}
 
 	if len(options.Topics) > 1 {
-		if err := validateTopicNames(options.Topics...); err != nil {
+		if tns, err = validateTopicNames(options.Topics...); err != nil {
 			return nil, err
 		}
+		for i := range options.Topics {
+			options.Topics[i] = tns[i].Name
+		}
 
 		return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
 	}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 97d2de2..f31bb69 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1147,7 +1147,7 @@ func TestDLQMultiTopics(t *testing.T) {
 }
 
 func TestRLQ(t *testing.T) {
-	topic := "persistent://public/default/" + newTopicName()
+	topic := newTopicName()
 	subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
 	maxRedeliveries := 2
 	N := 100
@@ -1243,7 +1243,7 @@ func TestRLQ(t *testing.T) {
 func TestRLQMultiTopics(t *testing.T) {
 	now := time.Now().Unix()
 	topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
-	topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+	topic02 := fmt.Sprintf("topic-%d-2", now)
 	topics := []string{topic01, topic02}
 
 	subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
@@ -1270,7 +1270,7 @@ func TestRLQMultiTopics(t *testing.T) {
 
 	// subscribe DLQ Topic
 	dlqConsumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                       "persistent://public/default/" + subName + "-DLQ",
+		Topic:                       subName + "-DLQ",
 		SubscriptionName:            subName,
 		SubscriptionInitialPosition: SubscriptionPositionEarliest,
 	})
diff --git a/pulsar/helper.go b/pulsar/helper.go
index 0f8cf20..fb42cb5 100644
--- a/pulsar/helper.go
+++ b/pulsar/helper.go
@@ -53,15 +53,16 @@ func (e *unexpectedErrMsg) Error() string {
 	return msg
 }
 
-func validateTopicNames(topics ...string) error {
-	var errs error
-	for _, t := range topics {
-		if _, err := internal.ParseTopicName(t); err != nil {
-			errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t)
+func validateTopicNames(topics ...string) ([]*internal.TopicName, error) {
+	tns := make([]*internal.TopicName, len(topics))
+	for i, t := range topics {
+		tn, err := internal.ParseTopicName(t)
+		if err != nil {
+			return nil, pkgerrors.Wrapf(err, "invalid topic name: %s", t)
 		}
+		tns[i] = tn
 	}
-
-	return errs
+	return tns, nil
 }
 
 func toKeyValues(metadata map[string]string) []*pb.KeyValue {