You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/12/19 03:36:47 UTC

[pulsar-client-go] branch master updated: fix requestID using consumerID && loopclosure issue && add old style topic support (#128)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ffced1  fix requestID using consumerID && loopclosure issue && add old style topic support (#128)
4ffced1 is described below

commit 4ffced1ae42a96553fd15f6cc295a22b2eb41674
Author: hailang.wei <53...@users.noreply.github.com>
AuthorDate: Thu Dec 19 11:36:37 2019 +0800

    fix requestID using consumerID && loopclosure issue && add old style topic support (#128)
    
    * fix requestID using consumerID
    
    * fix loopclosure issue
    
    * add old style topic support
---
 pulsar/consumer_partition.go       |  2 +-
 pulsar/consumer_regex.go           |  2 +-
 pulsar/internal/topic_name.go      |  5 +++--
 pulsar/internal/topic_name_test.go | 14 ++++++++++----
 4 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a217d55..68ee7af 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -465,7 +465,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 	pc.state = consumerClosing
 	pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
-	requestID := pc.client.rpcClient.NewConsumerID()
+	requestID := pc.client.rpcClient.NewRequestID()
 	cmdClose := &pb.CommandCloseConsumer{
 		ConsumerId: proto.Uint64(pc.consumerID),
 		RequestId:  proto.Uint64(requestID),
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index aa57356..e68c495 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -351,7 +351,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
 			c, err := internalTopicSubscribe(c, opts, topic, ch)
 			consumerErrorCh <- consumerError{
 				err:      err,
-				topic:    t,
+				topic:    topic,
 				consumer: c,
 			}
 		}(t)
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index f5b9890..78d2abb 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -44,9 +44,10 @@ func ParseTopicName(topic string) (*TopicName, error) {
 	if !strings.Contains(topic, "://") {
 		// The short topic name can be:
 		// - <topic>
-		// - <property>/<namespace>/<topic>
+		// - <tenant>/<namespace>/<topic>
+		// - <tenant>/<cluster>/<namespace>/<topic>
 		parts := strings.Split(topic, "/")
-		if len(parts) == 3 {
+		if len(parts) == 3 || len(parts) == 4 {
 			topic = "persistent://" + topic
 		} else if len(parts) == 1 {
 			topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go
index 738ab22..e57bf4d 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -61,6 +61,12 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
 	assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
 	assert.Equal(t, -1, topic.Partition)
+
+	topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
+	assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
 }
 
 func TestParseTopicNameErrors(t *testing.T) {
@@ -84,16 +90,16 @@ func TestParseTopicNameErrors(t *testing.T) {
 }
 
 func TestTopicNameWithoutPartitionPart(t *testing.T) {
-	tests := []struct{
-		tn TopicName
+	tests := []struct {
+		tn       TopicName
 		expected string
 	}{
 		{
-			tn: TopicName{Name:"persistent://public/default/my-topic", Partition:-1},
+			tn:       TopicName{Name: "persistent://public/default/my-topic", Partition: -1},
 			expected: "persistent://public/default/my-topic",
 		},
 		{
-			tn: TopicName{Name:"persistent://public/default/my-topic-partition-0", Partition:0},
+			tn:       TopicName{Name: "persistent://public/default/my-topic-partition-0", Partition: 0},
 			expected: "persistent://public/default/my-topic",
 		},
 	}