You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/10 16:57:03 UTC
[pulsar-client-go] branch master updated: add Name method to
Consumer interface (#321)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6188199 add Name method to Consumer interface (#321)
6188199 is described below
commit 6188199e6b6260ef8fc9cc1b3350480116b3ca67
Author: Shohi Wang <os...@gmail.com>
AuthorDate: Sat Jul 11 00:56:55 2020 +0800
add Name method to Consumer interface (#321)
---
pulsar/consumer.go | 3 +++
pulsar/consumer_impl.go | 16 ++++++++++------
pulsar/consumer_multitopic.go | 21 ++++++++++++++-------
pulsar/consumer_regex.go | 10 +++++++++-
pulsar/consumer_test.go | 25 +++++++++++++++++++++++++
5 files changed, 61 insertions(+), 14 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 97020b6..8d3c771 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -197,4 +197,7 @@ type Consumer interface {
// the message publish time where to reposition the subscription
//
SeekByTime(time time.Time) error
+
+ // Name returns the name of consumer.
+ Name() string
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20dc1af..f9ee004 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -74,6 +74,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.ReceiverQueueSize = 1000
}
+ if options.Name == "" {
+ options.Name = generateRandomName()
+ }
+
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
@@ -136,12 +140,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
errorCh: make(chan error),
dlq: dlq,
log: log.WithField("topic", topic),
- }
-
- if options.Name != "" {
- consumer.consumerName = options.Name
- } else {
- consumer.consumerName = generateRandomName()
+ consumerName: options.Name,
}
err := consumer.internalTopicSubscribeToPartitions()
@@ -166,6 +165,11 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
return consumer, nil
}
+// Name returns the name of consumer.
+func (c *consumer) Name() string {
+ return c.consumerName
+}
+
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 94e7b9d..a5386cb 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -32,7 +32,8 @@ import (
type multiTopicConsumer struct {
options ConsumerOptions
- messageCh chan ConsumerMessage
+ consumerName string
+ messageCh chan ConsumerMessage
consumers map[string]Consumer
@@ -46,12 +47,13 @@ type multiTopicConsumer struct {
func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
mtc := &multiTopicConsumer{
- options: options,
- messageCh: messageCh,
- consumers: make(map[string]Consumer, len(topics)),
- closeCh: make(chan struct{}),
- dlq: dlq,
- log: &log.Entry{},
+ options: options,
+ messageCh: messageCh,
+ consumers: make(map[string]Consumer, len(topics)),
+ closeCh: make(chan struct{}),
+ dlq: dlq,
+ log: &log.Entry{},
+ consumerName: options.Name,
}
var errs error
@@ -173,3 +175,8 @@ func (c *multiTopicConsumer) Seek(msgID MessageID) error {
func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
return errors.New("seek command not allowed for multi topic consumer")
}
+
+// Name returns the name of consumer.
+func (c *multiTopicConsumer) Name() string {
+ return c.consumerName
+}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 3d0aebe..75ca657 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -59,6 +59,8 @@ type regexConsumer struct {
ticker *time.Ticker
log *log.Entry
+
+ consumerName string
}
func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
@@ -78,7 +80,8 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
closeCh: make(chan struct{}),
- log: log.WithField("topic", tn.Name),
+ log: log.WithField("topic", tn.Name),
+ consumerName: opts.Name,
}
topics, err := rc.topics()
@@ -222,6 +225,11 @@ func (c *regexConsumer) SeekByTime(time time.Time) error {
return errors.New("seek command not allowed for regex consumer")
}
+// Name returns the name of consumer.
+func (c *regexConsumer) Name() string {
+ return c.consumerName
+}
+
func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index d598dd3..4031f7d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1342,3 +1342,28 @@ func TestProducerName(t *testing.T) {
consumer.Ack(msg)
}
}
+
+func TestConsumerName(t *testing.T) {
+ assert := assert.New(t)
+
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ // create consumer
+ consumerName := "test-consumer-name"
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Name: consumerName,
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+
+ assert.Nil(err)
+ defer consumer.Close()
+
+ assert.Equal(consumerName, consumer.Name())
+}