You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/03/29 06:22:53 UTC

[pulsar-client-go] branch master updated: allow config reader subscription name (#754)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 925da1a  allow config reader subscription name (#754)
925da1a is described below

commit 925da1a039a9551ab410727d7766c7e0a13ad69d
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Tue Mar 29 14:22:48 2022 +0800

    allow config reader subscription name (#754)
    
    ### Motivation
    allow config reader's subscription name, follow java's feature https://github.com/apache/pulsar/pull/8801
    
    ### Modifications
    add param `SubscriptionName` in `ReaderOptions`
    
    ### Verifying this change
    add the test for setting the subscritpion name
---
 pulsar/reader.go      |  4 ++++
 pulsar/reader_impl.go |  9 ++++++---
 pulsar/reader_test.go | 22 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index f1cb575..3539037 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -68,6 +68,10 @@ type ReaderOptions struct {
 	// SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader".
 	SubscriptionRolePrefix string
 
+	// SubscriptionName sets the subscription name.
+	// If subscriptionRolePrefix is set at the same time, this configuration will prevail
+	SubscriptionName string
+
 	// ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the
 	// full message backlog of the topic. This means that, if the topic has been compacted, the reader will only
 	// see the latest value for each key in the topic, up until the point in the topic message backlog that has
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 596884a..dd552b6 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
 		}
 	}
 
-	subscriptionName := options.SubscriptionRolePrefix
+	subscriptionName := options.SubscriptionName
 	if subscriptionName == "" {
-		subscriptionName = "reader"
+		subscriptionName = options.SubscriptionRolePrefix
+		if subscriptionName == "" {
+			subscriptionName = "reader"
+		}
+		subscriptionName += "-" + generateRandomName()
 	}
-	subscriptionName += "-" + generateRandomName()
 
 	receiverQueueSize := options.ReceiverQueueSize
 	if receiverQueueSize <= 0 {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index aa12078..3c85871 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/crypto"
+	"github.com/google/uuid"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) {
 	assert.NotNil(t, err)
 }
 
+func TestReaderConfigSubscribeName(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := client.CreateReader(ReaderOptions{
+		StartMessageID:   EarliestMessageID(),
+		Topic:            uuid.New().String(),
+		SubscriptionName: uuid.New().String(),
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+	assert.NotNil(t, consumer)
+}
+
 func TestReader(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,