You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/27 07:23:52 UTC
[incubator-inlong] 03/04: Address review comments
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 3cf68fbad554f7a6cdaddf6fd1021b4595650dd6
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed May 26 10:08:47 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/config/config.go | 161 +++++++++++++--------
.../tubemq-client-go/config/config_test.go | 12 +-
2 files changed, 109 insertions(+), 64 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index f4b026e..d867d0c 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -27,13 +27,14 @@ import (
)
// Config defines multiple configuration options.
+// Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
type Config struct {
// Net is the namespace for network-level properties used by Broker and Master.
Net struct {
// How long to wait for a response.
ReadTimeout time.Duration
// TLS based authentication with broker and master.
- TLS struct {
+ TLS struct {
// Whether or not to use TLS.
Enable bool
// CACertFile for TLS.
@@ -45,35 +46,68 @@ type Config struct {
// TTSServerName for TLS.
TLSServerName string
}
+ // Account based authentication with broker and master.
+ Auth struct {
+ // Whether or not to use authentication.
+ Enable bool
+ // Username of authentication.
+ UserName string
+ // Password of authentication.
+ Password string
+ }
}
// Consumer is the namespace for configuration related to consume messages,
// used by the consumer
Consumer struct {
- masters []string
- topic string
- offset int
- Group string
- boundConsume bool
- sessionKey string
- sourceCount int
- selectBig bool
- rollbackIfConfirmTimeout bool
- maxSubInfoReportInterval int
- maxPartCheckPeriod time.Duration
- partCheckSlice time.Duration
- msgNotFoundWait time.Duration
- rebConfirmWait time.Duration
- maxConfirmWait time.Duration
- shutdownRebWait time.Duration
+ // The addresses of master.
+ Masters []string
+ // The consumption topic.
+ Topic string
+ // The initial offset to use if no offset was previously committed.
+ ConsumePosition int
+ // The consumer group name.
+ Group string
+ // Whether or not to specify the offset.
+ BoundConsume bool
+ // SessionKey is defined by the client.
+ // The session key will be the same in a batch.
+ SessionKey string
+ // The number of consumers in a batch.
+ SourceCount int
+ // If multiple consumers want to reset the offset of the same partition,
+ // whether the server or not to use the biggest offset
+ // the server will use the biggest offset if set.
+ // Otherwise the server will use the smallest offset.
+ SelectBig bool
+ // If the confirm request timeouts, whether this batch of data should be considered as successful.
+ // This batch of data will not be considered as successful if set.
+ RollbackIfConfirmTimeout bool
+ // The maximum interval for the client to report subscription information.
+ MaxSubInfoReportInterval int
+ // The maximum interval to check the partition.
+ MaxPartCheckPeriod time.Duration
+ // The interval to check the partition.
+ PartCheckSlice time.Duration
+ // The maximum wait time the offset of a partition has reached the maximum offset.
+ MsgNotFoundWait time.Duration
+ // How long to wait when the server is rebalancing and the partition is being occupied by the client.
+ RebConfirmWait time.Duration
+ // The maximum wait time a partition consumption command is released.
+ MaxConfirmWait time.Duration
+ // How long to wait when shutdown is called and the server is rebalancing.
+ ShutdownRebWait time.Duration
}
// Heartbeat is the namespace for configuration related to heartbeat messages,
// used by the consumer
Heartbeat struct {
- interval time.Duration
- maxRetryTimes int
- afterFail time.Duration
+ // How frequently to send heartbeat.
+ Interval time.Duration
+ // The total number of times to retry sending heartbeat.
+ MaxRetryTimes int
+ // The heartbeat timeout after a heartbeat failure.
+ AfterFail time.Duration
}
}
@@ -82,23 +116,28 @@ func newDefaultConfig() *Config {
c.Net.ReadTimeout = 15000 * time.Millisecond
c.Net.TLS.Enable = false
-
- c.Consumer.boundConsume = false
- c.Consumer.sessionKey = ""
- c.Consumer.sourceCount = 0
- c.Consumer.selectBig = true
- c.Consumer.offset = 0
- c.Consumer.rollbackIfConfirmTimeout = true
- c.Consumer.maxSubInfoReportInterval = 6
- c.Consumer.maxPartCheckPeriod = 60000 * time.Millisecond
- c.Consumer.partCheckSlice = 300 * time.Millisecond
- c.Consumer.rebConfirmWait = 3000 * time.Millisecond
- c.Consumer.maxConfirmWait = 60000 * time.Millisecond
- c.Consumer.shutdownRebWait = 10000 * time.Millisecond
-
- c.Heartbeat.interval = 10000 * time.Millisecond
- c.Heartbeat.maxRetryTimes = 5
- c.Heartbeat.afterFail = 60000 * time.Millisecond
+ c.Net.Auth.Enable = false
+ c.Net.Auth.UserName = ""
+ c.Net.Auth.Password = ""
+
+ c.Consumer.Group = ""
+ c.Consumer.BoundConsume = false
+ c.Consumer.SessionKey = ""
+ c.Consumer.SourceCount = 0
+ c.Consumer.SelectBig = true
+ c.Consumer.ConsumePosition = 0
+ c.Consumer.RollbackIfConfirmTimeout = true
+ c.Consumer.MaxSubInfoReportInterval = 6
+ c.Consumer.MaxPartCheckPeriod = 60000 * time.Millisecond
+ c.Consumer.PartCheckSlice = 300 * time.Millisecond
+ c.Consumer.MsgNotFoundWait = 400 * time.Millisecond
+ c.Consumer.RebConfirmWait = 3000 * time.Millisecond
+ c.Consumer.MaxConfirmWait = 60000 * time.Millisecond
+ c.Consumer.ShutdownRebWait = 10000 * time.Millisecond
+
+ c.Heartbeat.Interval = 10000 * time.Millisecond
+ c.Heartbeat.MaxRetryTimes = 5
+ c.Heartbeat.AfterFail = 60000 * time.Millisecond
return c
}
@@ -112,11 +151,11 @@ func ParseAddress(address string) (config *Config, err error) {
return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens)
}
- c.Consumer.masters = strings.Split(tokens[0], ",")
+ c.Consumer.Masters = strings.Split(tokens[0], ",")
tokens = strings.Split(tokens[1], "&")
if len(tokens) == 0 {
- return nil, fmt.Errorf("address formata invalid: masters: %v with empty params", config.Consumer.masters)
+ return nil, fmt.Errorf("address formata invalid: Masters: %v with empty params", config.Consumer.Masters)
}
for _, token := range tokens {
@@ -140,7 +179,7 @@ func getConfigFromToken(config *Config, values []string) error {
config.Net.ReadTimeout, err = parseDuration(values[1])
case "tlsEnable":
config.Net.TLS.Enable, err = strconv.ParseBool(values[1])
- case "caCertFile":
+ case "CACertFile":
config.Net.TLS.CACertFile = values[1]
case "tlsCertFile":
config.Net.TLS.TLSCertFile = values[1]
@@ -151,39 +190,45 @@ func getConfigFromToken(config *Config, values []string) error {
case "group":
config.Consumer.Group = values[1]
case "topic":
- config.Consumer.topic = values[1]
- case "offset":
- config.Consumer.offset, err = strconv.Atoi(values[1])
+ config.Consumer.Topic = values[1]
+ case "consumePosition":
+ config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
case "boundConsume":
- config.Consumer.boundConsume, err = strconv.ParseBool(values[1])
+ config.Consumer.BoundConsume, err = strconv.ParseBool(values[1])
case "sessionKey":
- config.Consumer.sessionKey = values[1]
+ config.Consumer.SessionKey = values[1]
case "sourceCount":
- config.Consumer.sourceCount, err = strconv.Atoi(values[1])
+ config.Consumer.SourceCount, err = strconv.Atoi(values[1])
case "selectBig":
- config.Consumer.selectBig, err = strconv.ParseBool(values[1])
+ config.Consumer.SelectBig, err = strconv.ParseBool(values[1])
case "rollbackIfConfirmTimeout":
- config.Consumer.rollbackIfConfirmTimeout, err = strconv.ParseBool(values[1])
+ config.Consumer.RollbackIfConfirmTimeout, err = strconv.ParseBool(values[1])
case "maxSubInfoReportInterval":
- config.Consumer.maxSubInfoReportInterval, err = strconv.Atoi(values[1])
+ config.Consumer.MaxSubInfoReportInterval, err = strconv.Atoi(values[1])
case "maxPartCheckPeriod":
- config.Consumer.maxPartCheckPeriod, err = parseDuration(values[1])
+ config.Consumer.MaxPartCheckPeriod, err = parseDuration(values[1])
case "partCheckSlice":
- config.Consumer.partCheckSlice, err = parseDuration(values[1])
+ config.Consumer.PartCheckSlice, err = parseDuration(values[1])
case "msgNotFoundWait":
- config.Consumer.msgNotFoundWait, err = parseDuration(values[1])
+ config.Consumer.MsgNotFoundWait, err = parseDuration(values[1])
case "rebConfirmWait":
- config.Consumer.rebConfirmWait, err = parseDuration(values[1])
+ config.Consumer.RebConfirmWait, err = parseDuration(values[1])
case "maxConfirmWait":
- config.Consumer.maxConfirmWait, err = parseDuration(values[1])
+ config.Consumer.MaxConfirmWait, err = parseDuration(values[1])
case "shutdownRebWait":
- config.Consumer.shutdownRebWait, err = parseDuration(values[1])
+ config.Consumer.ShutdownRebWait, err = parseDuration(values[1])
case "heartbeatInterval":
- config.Heartbeat.interval, err = parseDuration(values[1])
+ config.Heartbeat.Interval, err = parseDuration(values[1])
case "heartbeatMaxRetryTimes":
- config.Heartbeat.maxRetryTimes, err = strconv.Atoi(values[1])
+ config.Heartbeat.MaxRetryTimes, err = strconv.Atoi(values[1])
case "heartbeatAfterFail":
- config.Heartbeat.afterFail, err = parseDuration(values[1])
+ config.Heartbeat.AfterFail, err = parseDuration(values[1])
+ case "authEnable":
+ config.Net.Auth.Enable, err = strconv.ParseBool(values[1])
+ case "authUserName":
+ config.Net.Auth.UserName = values[1]
+ case "authPassword":
+ config.Net.Auth.Password = values[1]
default:
return fmt.Errorf("address format invalid, unknown keys: %v", values[0])
}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index acefe1c..56bc600 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -28,24 +28,24 @@ func TestParseAddress(t *testing.T) {
address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
c, err := ParseAddress(address)
assert.Nil(t, err)
- assert.Equal(t, c.Consumer.masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
- assert.Equal(t, c.Consumer.topic, "Topic")
+ assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
+ assert.Equal(t, c.Consumer.Topic, "Topic")
assert.Equal(t, c.Consumer.Group, "Group")
- assert.Equal(t, c.Consumer.msgNotFoundWait, 10000 * time.Millisecond)
+ assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
assert.Equal(t, c.Net.TLS.Enable, false)
- assert.Equal(t, c.Heartbeat.maxRetryTimes, 6)
+ assert.Equal(t, c.Heartbeat.MaxRetryTimes, 6)
address = ""
_, err = ParseAddress(address)
assert.NotNil(t, err)
- address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt"
+ address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
_, err = ParseAddress(address)
assert.NotNil(t, err)
- address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt=ttt"
+ address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
_, err = ParseAddress(address)
assert.NotNil(t, err)
}