You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/12/19 03:36:47 UTC
[pulsar-client-go] branch master updated: fix requestID using
consumerID && loopclosure issue && add old style topic support (#128)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 4ffced1 fix requestID using consumerID && loopclosure issue && add old style topic support (#128)
4ffced1 is described below
commit 4ffced1ae42a96553fd15f6cc295a22b2eb41674
Author: hailang.wei <53...@users.noreply.github.com>
AuthorDate: Thu Dec 19 11:36:37 2019 +0800
fix requestID using consumerID && loopclosure issue && add old style topic support (#128)
* fix requestID using consumerID
* fix loopclosure issue
* add old style topic support
---
pulsar/consumer_partition.go | 2 +-
pulsar/consumer_regex.go | 2 +-
pulsar/internal/topic_name.go | 5 +++--
pulsar/internal/topic_name_test.go | 14 ++++++++++----
4 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a217d55..68ee7af 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -465,7 +465,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
pc.state = consumerClosing
pc.log.Infof("Closing consumer=%d", pc.consumerID)
- requestID := pc.client.rpcClient.NewConsumerID()
+ requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index aa57356..e68c495 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -351,7 +351,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
c, err := internalTopicSubscribe(c, opts, topic, ch)
consumerErrorCh <- consumerError{
err: err,
- topic: t,
+ topic: topic,
consumer: c,
}
}(t)
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index f5b9890..78d2abb 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -44,9 +44,10 @@ func ParseTopicName(topic string) (*TopicName, error) {
if !strings.Contains(topic, "://") {
// The short topic name can be:
// - <topic>
- // - <property>/<namespace>/<topic>
+ // - <tenant>/<namespace>/<topic>
+ // - <tenant>/<cluster>/<namespace>/<topic>
parts := strings.Split(topic, "/")
- if len(parts) == 3 {
+ if len(parts) == 3 || len(parts) == 4 {
topic = "persistent://" + topic
} else if len(parts) == 1 {
topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go
index 738ab22..e57bf4d 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -61,6 +61,12 @@ func TestParseTopicName(t *testing.T) {
assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
assert.Equal(t, -1, topic.Partition)
+
+ topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
+ assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
}
func TestParseTopicNameErrors(t *testing.T) {
@@ -84,16 +90,16 @@ func TestParseTopicNameErrors(t *testing.T) {
}
func TestTopicNameWithoutPartitionPart(t *testing.T) {
- tests := []struct{
- tn TopicName
+ tests := []struct {
+ tn TopicName
expected string
}{
{
- tn: TopicName{Name:"persistent://public/default/my-topic", Partition:-1},
+ tn: TopicName{Name: "persistent://public/default/my-topic", Partition: -1},
expected: "persistent://public/default/my-topic",
},
{
- tn: TopicName{Name:"persistent://public/default/my-topic-partition-0", Partition:0},
+ tn: TopicName{Name: "persistent://public/default/my-topic-partition-0", Partition: 0},
expected: "persistent://public/default/my-topic",
},
}