You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 07:52:29 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-791]Go SDK
Support multiple topic address (#587)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 1e83a2c [INLONG-791]Go SDK Support multiple topic address (#587)
1e83a2c is described below
commit 1e83a2c2811839463cb53266dd9ec312041d3b65
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 15:52:25 2021 +0800
[INLONG-791]Go SDK Support multiple topic address (#587)
---
tubemq-client-twins/tubemq-client-go/config/config.go | 16 +++++++++++++---
.../tubemq-client-go/config/config_test.go | 6 +++++-
tubemq-client-twins/tubemq-client-go/log/config.go | 2 +-
tubemq-client-twins/tubemq-client-go/remote/remote.go | 7 ++++---
4 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 7a93831..c1fcb72 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -116,7 +116,8 @@ type Config struct {
}
}
-func newDefaultConfig() *Config {
+// NewDefaultConfig returns a default config of the client.
+func NewDefaultConfig() *Config {
c := &Config{}
c.Net.ReadTimeout = 15000 * time.Millisecond
@@ -149,7 +150,7 @@ func newDefaultConfig() *Config {
// ParseAddress parses the address to user-defined config.
func ParseAddress(address string) (config *Config, err error) {
- c := newDefaultConfig()
+ c := NewDefaultConfig()
tokens := strings.SplitN(address, "?", 2)
if len(tokens) != 2 {
@@ -195,7 +196,16 @@ func getConfigFromToken(config *Config, values []string) error {
case "group":
config.Consumer.Group = values[1]
case "topics":
- config.Consumer.Topics = strings.Split(values[1], ",")
+ topicFilters := strings.Split(values[1], ";")
+ config.Consumer.TopicFilters = make(map[string][]string)
+ for _, topicFilter := range topicFilters {
+ tf := strings.Split(topicFilter, "@")
+ config.Consumer.Topics = append(config.Consumer.Topics, tf[0])
+ if len(tf) > 1 {
+ filters := strings.Split(tf[1], ",")
+ config.Consumer.TopicFilters[tf[0]] = filters
+ }
+ }
case "consumePosition":
config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index a8ac703..8850608 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,15 @@ import (
)
func TestParseAddress(t *testing.T) {
- address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+ address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1@12312323,1212;Topic2@121212,2321323&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+ topicFilters := make(map[string][]string)
+ topicFilters["Topic1"] = []string{"12312323", "1212"}
+ topicFilters["Topic2"] = []string{"121212", "2321323"}
c, err := ParseAddress(address)
assert.Nil(t, err)
assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
+ assert.Equal(t, c.Consumer.TopicFilters, topicFilters)
assert.Equal(t, c.Consumer.Group, "Group")
assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
diff --git a/tubemq-client-twins/tubemq-client-go/log/config.go b/tubemq-client-twins/tubemq-client-go/log/config.go
index b71cf9d..d2ae1a9 100644
--- a/tubemq-client-twins/tubemq-client-go/log/config.go
+++ b/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -33,7 +33,7 @@ type OutputConfig struct {
}
var defaultConfig = &OutputConfig{
- LogPath: "../log/tubemq",
+ LogPath: "../log/tubemq.log",
MaxSize: 100,
MaxBackups: 5,
MaxAge: 3,
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index c8fec4e..637859e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -253,6 +253,7 @@ func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
delete(r.usedPartitions, partitionKey)
+ delete(r.partitionTimeouts, partitionKey)
r.removeFromIndexPartitions(partitionKey)
if reuse {
if _, ok := r.partitions[partitionKey]; ok {
@@ -292,7 +293,7 @@ func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
partitionKey := newPartition.GetPartitionKey()
if _, ok := r.partitions[partitionKey]; !ok {
r.partitions[partitionKey] = newPartition
- if partitions, ok := r.topicPartitions[partitionKey]; !ok {
+ if partitions, ok := r.topicPartitions[newPartition.GetTopic()]; !ok {
newPartitions := make(map[string]bool)
newPartitions[partitionKey] = true
r.topicPartitions[newPartition.GetTopic()] = newPartitions
@@ -429,11 +430,11 @@ func (r *RmtDataCache) ReleasePartition(checkDelay bool, filterConsume bool, con
}
if delay > 10 {
r.partitionTimeouts[partitionKey] = time.AfterFunc(time.Duration(delay)*time.Millisecond, func() {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
r.resetIdlePartition(partitionKey, true)
})
} else {
- r.metaMu.Lock()
- defer r.metaMu.Unlock()
r.indexPartitions = append(r.indexPartitions, partitionKey)
}
} else {