You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/10 16:57:03 UTC

[pulsar-client-go] branch master updated: add Name method to Consumer interface (#321)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6188199  add Name method to Consumer interface (#321)
6188199 is described below

commit 6188199e6b6260ef8fc9cc1b3350480116b3ca67
Author: Shohi Wang <os...@gmail.com>
AuthorDate: Sat Jul 11 00:56:55 2020 +0800

    add Name method to Consumer interface (#321)
---
 pulsar/consumer.go            |  3 +++
 pulsar/consumer_impl.go       | 16 ++++++++++------
 pulsar/consumer_multitopic.go | 21 ++++++++++++++-------
 pulsar/consumer_regex.go      | 10 +++++++++-
 pulsar/consumer_test.go       | 25 +++++++++++++++++++++++++
 5 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 97020b6..8d3c771 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -197,4 +197,7 @@ type Consumer interface {
 	//            the message publish time where to reposition the subscription
 	//
 	SeekByTime(time time.Time) error
+
+	// Name returns the name of consumer.
+	Name() string
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20dc1af..f9ee004 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -74,6 +74,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		options.ReceiverQueueSize = 1000
 	}
 
+	if options.Name == "" {
+		options.Name = generateRandomName()
+	}
+
 	// did the user pass in a message channel?
 	messageCh := options.MessageChannel
 	if options.MessageChannel == nil {
@@ -136,12 +140,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
 		errorCh:                   make(chan error),
 		dlq:                       dlq,
 		log:                       log.WithField("topic", topic),
-	}
-
-	if options.Name != "" {
-		consumer.consumerName = options.Name
-	} else {
-		consumer.consumerName = generateRandomName()
+		consumerName:              options.Name,
 	}
 
 	err := consumer.internalTopicSubscribeToPartitions()
@@ -166,6 +165,11 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
 	return consumer, nil
 }
 
+// Name returns the name of consumer.
+func (c *consumer) Name() string {
+	return c.consumerName
+}
+
 func (c *consumer) internalTopicSubscribeToPartitions() error {
 	partitions, err := c.client.TopicPartitions(c.topic)
 	if err != nil {
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 94e7b9d..a5386cb 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -32,7 +32,8 @@ import (
 type multiTopicConsumer struct {
 	options ConsumerOptions
 
-	messageCh chan ConsumerMessage
+	consumerName string
+	messageCh    chan ConsumerMessage
 
 	consumers map[string]Consumer
 
@@ -46,12 +47,13 @@ type multiTopicConsumer struct {
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
 	messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
 	mtc := &multiTopicConsumer{
-		options:   options,
-		messageCh: messageCh,
-		consumers: make(map[string]Consumer, len(topics)),
-		closeCh:   make(chan struct{}),
-		dlq:       dlq,
-		log:       &log.Entry{},
+		options:      options,
+		messageCh:    messageCh,
+		consumers:    make(map[string]Consumer, len(topics)),
+		closeCh:      make(chan struct{}),
+		dlq:          dlq,
+		log:          &log.Entry{},
+		consumerName: options.Name,
 	}
 
 	var errs error
@@ -173,3 +175,8 @@ func (c *multiTopicConsumer) Seek(msgID MessageID) error {
 func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
 	return errors.New("seek command not allowed for multi topic consumer")
 }
+
+// Name returns the name of consumer.
+func (c *multiTopicConsumer) Name() string {
+	return c.consumerName
+}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 3d0aebe..75ca657 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -59,6 +59,8 @@ type regexConsumer struct {
 	ticker *time.Ticker
 
 	log *log.Entry
+
+	consumerName string
 }
 
 func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
@@ -78,7 +80,8 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
 
 		closeCh: make(chan struct{}),
 
-		log: log.WithField("topic", tn.Name),
+		log:          log.WithField("topic", tn.Name),
+		consumerName: opts.Name,
 	}
 
 	topics, err := rc.topics()
@@ -222,6 +225,11 @@ func (c *regexConsumer) SeekByTime(time time.Time) error {
 	return errors.New("seek command not allowed for regex consumer")
 }
 
+// Name returns the name of consumer.
+func (c *regexConsumer) Name() string {
+	return c.consumerName
+}
+
 func (c *regexConsumer) closed() bool {
 	select {
 	case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index d598dd3..4031f7d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1342,3 +1342,28 @@ func TestProducerName(t *testing.T) {
 		consumer.Ack(msg)
 	}
 }
+
+func TestConsumerName(t *testing.T) {
+	assert := assert.New(t)
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create consumer
+	consumerName := "test-consumer-name"
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Name:             consumerName,
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+	})
+
+	assert.Nil(err)
+	defer consumer.Close()
+
+	assert.Equal(consumerName, consumer.Name())
+}