You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/03/29 06:22:53 UTC
[pulsar-client-go] branch master updated: allow config reader subscription name (#754)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 925da1a allow config reader subscription name (#754)
925da1a is described below
commit 925da1a039a9551ab410727d7766c7e0a13ad69d
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Tue Mar 29 14:22:48 2022 +0800
allow config reader subscription name (#754)
### Motivation
allow config reader's subscription name, follow java's feature https://github.com/apache/pulsar/pull/8801
### Modifications
add param `SubscriptionName` in `ReaderOptions`
### Verifying this change
add the test for setting the subscritpion name
---
pulsar/reader.go | 4 ++++
pulsar/reader_impl.go | 9 ++++++---
pulsar/reader_test.go | 22 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/pulsar/reader.go b/pulsar/reader.go
index f1cb575..3539037 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -68,6 +68,10 @@ type ReaderOptions struct {
// SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
+ // SubscriptionName sets the subscription name.
+ // If subscriptionRolePrefix is set at the same time, this configuration will prevail
+ SubscriptionName string
+
// ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the
// full message backlog of the topic. This means that, if the topic has been compacted, the reader will only
// see the latest value for each key in the topic, up until the point in the topic message backlog that has
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 596884a..dd552b6 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
}
}
- subscriptionName := options.SubscriptionRolePrefix
+ subscriptionName := options.SubscriptionName
if subscriptionName == "" {
- subscriptionName = "reader"
+ subscriptionName = options.SubscriptionRolePrefix
+ if subscriptionName == "" {
+ subscriptionName = "reader"
+ }
+ subscriptionName += "-" + generateRandomName()
}
- subscriptionName += "-" + generateRandomName()
receiverQueueSize := options.ReceiverQueueSize
if receiverQueueSize <= 0 {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index aa12078..3c85871 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/crypto"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
@@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) {
assert.NotNil(t, err)
}
+func TestReaderConfigSubscribeName(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ consumer, err := client.CreateReader(ReaderOptions{
+ StartMessageID: EarliestMessageID(),
+ Topic: uuid.New().String(),
+ SubscriptionName: uuid.New().String(),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer consumer.Close()
+ assert.NotNil(t, consumer)
+}
+
func TestReader(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,