You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 07:52:29 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-791]Go SDK Support multiple topic address (#587)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 1e83a2c  [INLONG-791]Go SDK Support multiple topic address (#587)
1e83a2c is described below

commit 1e83a2c2811839463cb53266dd9ec312041d3b65
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 15:52:25 2021 +0800

    [INLONG-791]Go SDK Support multiple topic address (#587)
---
 tubemq-client-twins/tubemq-client-go/config/config.go    | 16 +++++++++++++---
 .../tubemq-client-go/config/config_test.go               |  6 +++++-
 tubemq-client-twins/tubemq-client-go/log/config.go       |  2 +-
 tubemq-client-twins/tubemq-client-go/remote/remote.go    |  7 ++++---
 4 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 7a93831..c1fcb72 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -116,7 +116,8 @@ type Config struct {
 	}
 }
 
-func newDefaultConfig() *Config {
+// NewDefaultConfig returns a default config of the client.
+func NewDefaultConfig() *Config {
 	c := &Config{}
 
 	c.Net.ReadTimeout = 15000 * time.Millisecond
@@ -149,7 +150,7 @@ func newDefaultConfig() *Config {
 
 // ParseAddress parses the address to user-defined config.
 func ParseAddress(address string) (config *Config, err error) {
-	c := newDefaultConfig()
+	c := NewDefaultConfig()
 
 	tokens := strings.SplitN(address, "?", 2)
 	if len(tokens) != 2 {
@@ -195,7 +196,16 @@ func getConfigFromToken(config *Config, values []string) error {
 	case "group":
 		config.Consumer.Group = values[1]
 	case "topics":
-		config.Consumer.Topics = strings.Split(values[1], ",")
+		topicFilters := strings.Split(values[1], ";")
+		config.Consumer.TopicFilters = make(map[string][]string)
+		for _, topicFilter := range topicFilters {
+			tf := strings.Split(topicFilter, "@")
+			config.Consumer.Topics = append(config.Consumer.Topics, tf[0])
+			if len(tf) > 1 {
+				filters := strings.Split(tf[1], ",")
+				config.Consumer.TopicFilters[tf[0]] = filters
+			}
+		}
 	case "consumePosition":
 		config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
 	case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index a8ac703..8850608 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,15 @@ import (
 )
 
 func TestParseAddress(t *testing.T) {
-	address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+	address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1@12312323,1212;Topic2@121212,2321323&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+	topicFilters := make(map[string][]string)
+	topicFilters["Topic1"] = []string{"12312323", "1212"}
+	topicFilters["Topic2"] = []string{"121212", "2321323"}
 	c, err := ParseAddress(address)
 	assert.Nil(t, err)
 	assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
 	assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
+	assert.Equal(t, c.Consumer.TopicFilters, topicFilters)
 	assert.Equal(t, c.Consumer.Group, "Group")
 	assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
diff --git a/tubemq-client-twins/tubemq-client-go/log/config.go b/tubemq-client-twins/tubemq-client-go/log/config.go
index b71cf9d..d2ae1a9 100644
--- a/tubemq-client-twins/tubemq-client-go/log/config.go
+++ b/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -33,7 +33,7 @@ type OutputConfig struct {
 }
 
 var defaultConfig = &OutputConfig{
-	LogPath:    "../log/tubemq",
+	LogPath:    "../log/tubemq.log",
 	MaxSize:    100,
 	MaxBackups: 5,
 	MaxAge:     3,
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index c8fec4e..637859e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -253,6 +253,7 @@ func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
 
 func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
 	delete(r.usedPartitions, partitionKey)
+	delete(r.partitionTimeouts, partitionKey)
 	r.removeFromIndexPartitions(partitionKey)
 	if reuse {
 		if _, ok := r.partitions[partitionKey]; ok {
@@ -292,7 +293,7 @@ func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
 	partitionKey := newPartition.GetPartitionKey()
 	if _, ok := r.partitions[partitionKey]; !ok {
 		r.partitions[partitionKey] = newPartition
-		if partitions, ok := r.topicPartitions[partitionKey]; !ok {
+		if partitions, ok := r.topicPartitions[newPartition.GetTopic()]; !ok {
 			newPartitions := make(map[string]bool)
 			newPartitions[partitionKey] = true
 			r.topicPartitions[newPartition.GetTopic()] = newPartitions
@@ -429,11 +430,11 @@ func (r *RmtDataCache) ReleasePartition(checkDelay bool, filterConsume bool, con
 				}
 				if delay > 10 {
 					r.partitionTimeouts[partitionKey] = time.AfterFunc(time.Duration(delay)*time.Millisecond, func() {
+						r.metaMu.Lock()
+						defer r.metaMu.Unlock()
 						r.resetIdlePartition(partitionKey, true)
 					})
 				} else {
-					r.metaMu.Lock()
-					defer r.metaMu.Unlock()
 					r.indexPartitions = append(r.indexPartitions, partitionKey)
 				}
 			} else {