You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/27 07:23:52 UTC

[incubator-inlong] 03/04: Address review comments

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 3cf68fbad554f7a6cdaddf6fd1021b4595650dd6
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed May 26 10:08:47 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/config/config.go              | 161 +++++++++++++--------
 .../tubemq-client-go/config/config_test.go         |  12 +-
 2 files changed, 109 insertions(+), 64 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index f4b026e..d867d0c 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -27,13 +27,14 @@ import (
 )
 
 // Config defines multiple configuration options.
+// Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
 type Config struct {
 	// Net is the namespace for network-level properties used by Broker and Master.
 	Net struct {
 		// How long to wait for a response.
 		ReadTimeout time.Duration
 		// TLS based authentication with broker and master.
-		TLS         struct {
+		TLS struct {
 			// Whether or not to use TLS.
 			Enable bool
 			// CACertFile for TLS.
@@ -45,35 +46,68 @@ type Config struct {
 			// TTSServerName for TLS.
 			TLSServerName string
 		}
+		// Account based authentication with broker and master.
+		Auth struct {
+			// Whether or not to use authentication.
+			Enable bool
+			// Username of authentication.
+			UserName string
+			// Password of authentication.
+			Password string
+		}
 	}
 
 	// Consumer is the namespace for configuration related to consume messages,
 	// used by the consumer
 	Consumer struct {
-		masters []string
-		topic string
-		offset int
-		Group                    string
-		boundConsume             bool
-		sessionKey               string
-		sourceCount              int
-		selectBig                bool
-		rollbackIfConfirmTimeout bool
-		maxSubInfoReportInterval int
-		maxPartCheckPeriod       time.Duration
-		partCheckSlice           time.Duration
-		msgNotFoundWait          time.Duration
-		rebConfirmWait           time.Duration
-		maxConfirmWait           time.Duration
-		shutdownRebWait          time.Duration
+		// The addresses of master.
+		Masters []string
+		// The consumption topic.
+		Topic string
+		// The initial offset to use if no offset was previously committed.
+		ConsumePosition int
+		// The consumer group name.
+		Group string
+		// Whether or not to specify the offset.
+		BoundConsume bool
+		// SessionKey is defined by the client.
+		// The session key will be the same in a batch.
+		SessionKey string
+		// The number of consumers in a batch.
+		SourceCount int
+		// If multiple consumers want to reset the offset of the same partition,
+		// whether the server or not to use the biggest offset
+		// the server will use the biggest offset if set.
+		// Otherwise the server will use the smallest offset.
+		SelectBig bool
+		// If the confirm request timeouts, whether this batch of data should be considered as successful.
+		// This batch of data will not be considered as successful if set.
+		RollbackIfConfirmTimeout bool
+		// The maximum interval for the client to report subscription information.
+		MaxSubInfoReportInterval int
+		// The maximum interval to check the partition.
+		MaxPartCheckPeriod time.Duration
+		// The interval to check the partition.
+		PartCheckSlice time.Duration
+		// The maximum wait time the offset of a partition has reached the maximum offset.
+		MsgNotFoundWait time.Duration
+		// How long to wait when the server is rebalancing and the partition is being occupied by the client.
+		RebConfirmWait time.Duration
+		// The maximum wait time a partition consumption command is released.
+		MaxConfirmWait time.Duration
+		// How long to wait when shutdown is called and the server is rebalancing.
+		ShutdownRebWait time.Duration
 	}
 
 	// Heartbeat is the namespace for configuration related to heartbeat messages,
 	// used by the consumer
 	Heartbeat struct {
-		interval      time.Duration
-		maxRetryTimes int
-		afterFail     time.Duration
+		// How frequently to send heartbeat.
+		Interval time.Duration
+		// The total number of times to retry sending heartbeat.
+		MaxRetryTimes int
+		// The heartbeat timeout after a heartbeat failure.
+		AfterFail time.Duration
 	}
 }
 
@@ -82,23 +116,28 @@ func newDefaultConfig() *Config {
 
 	c.Net.ReadTimeout = 15000 * time.Millisecond
 	c.Net.TLS.Enable = false
-
-	c.Consumer.boundConsume = false
-	c.Consumer.sessionKey = ""
-	c.Consumer.sourceCount = 0
-	c.Consumer.selectBig = true
-	c.Consumer.offset = 0
-	c.Consumer.rollbackIfConfirmTimeout = true
-	c.Consumer.maxSubInfoReportInterval = 6
-	c.Consumer.maxPartCheckPeriod = 60000 * time.Millisecond
-	c.Consumer.partCheckSlice = 300 * time.Millisecond
-	c.Consumer.rebConfirmWait = 3000 * time.Millisecond
-	c.Consumer.maxConfirmWait = 60000 * time.Millisecond
-	c.Consumer.shutdownRebWait = 10000 * time.Millisecond
-
-	c.Heartbeat.interval = 10000 * time.Millisecond
-	c.Heartbeat.maxRetryTimes = 5
-	c.Heartbeat.afterFail = 60000 * time.Millisecond
+	c.Net.Auth.Enable = false
+	c.Net.Auth.UserName = ""
+	c.Net.Auth.Password = ""
+
+	c.Consumer.Group = ""
+	c.Consumer.BoundConsume = false
+	c.Consumer.SessionKey = ""
+	c.Consumer.SourceCount = 0
+	c.Consumer.SelectBig = true
+	c.Consumer.ConsumePosition = 0
+	c.Consumer.RollbackIfConfirmTimeout = true
+	c.Consumer.MaxSubInfoReportInterval = 6
+	c.Consumer.MaxPartCheckPeriod = 60000 * time.Millisecond
+	c.Consumer.PartCheckSlice = 300 * time.Millisecond
+	c.Consumer.MsgNotFoundWait = 400 * time.Millisecond
+	c.Consumer.RebConfirmWait = 3000 * time.Millisecond
+	c.Consumer.MaxConfirmWait = 60000 * time.Millisecond
+	c.Consumer.ShutdownRebWait = 10000 * time.Millisecond
+
+	c.Heartbeat.Interval = 10000 * time.Millisecond
+	c.Heartbeat.MaxRetryTimes = 5
+	c.Heartbeat.AfterFail = 60000 * time.Millisecond
 
 	return c
 }
@@ -112,11 +151,11 @@ func ParseAddress(address string) (config *Config, err error) {
 		return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens)
 	}
 
-	c.Consumer.masters = strings.Split(tokens[0], ",")
+	c.Consumer.Masters = strings.Split(tokens[0], ",")
 
 	tokens = strings.Split(tokens[1], "&")
 	if len(tokens) == 0 {
-		return nil, fmt.Errorf("address formata invalid: masters: %v with empty params", config.Consumer.masters)
+		return nil, fmt.Errorf("address formata invalid: Masters: %v with empty params", config.Consumer.Masters)
 	}
 
 	for _, token := range tokens {
@@ -140,7 +179,7 @@ func getConfigFromToken(config *Config, values []string) error {
 		config.Net.ReadTimeout, err = parseDuration(values[1])
 	case "tlsEnable":
 		config.Net.TLS.Enable, err = strconv.ParseBool(values[1])
-	case "caCertFile":
+	case "CACertFile":
 		config.Net.TLS.CACertFile = values[1]
 	case "tlsCertFile":
 		config.Net.TLS.TLSCertFile = values[1]
@@ -151,39 +190,45 @@ func getConfigFromToken(config *Config, values []string) error {
 	case "group":
 		config.Consumer.Group = values[1]
 	case "topic":
-		config.Consumer.topic = values[1]
-	case "offset":
-		config.Consumer.offset, err = strconv.Atoi(values[1])
+		config.Consumer.Topic = values[1]
+	case "consumePosition":
+		config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
 	case "boundConsume":
-		config.Consumer.boundConsume, err = strconv.ParseBool(values[1])
+		config.Consumer.BoundConsume, err = strconv.ParseBool(values[1])
 	case "sessionKey":
-		config.Consumer.sessionKey = values[1]
+		config.Consumer.SessionKey = values[1]
 	case "sourceCount":
-		config.Consumer.sourceCount, err = strconv.Atoi(values[1])
+		config.Consumer.SourceCount, err = strconv.Atoi(values[1])
 	case "selectBig":
-		config.Consumer.selectBig, err = strconv.ParseBool(values[1])
+		config.Consumer.SelectBig, err = strconv.ParseBool(values[1])
 	case "rollbackIfConfirmTimeout":
-		config.Consumer.rollbackIfConfirmTimeout, err = strconv.ParseBool(values[1])
+		config.Consumer.RollbackIfConfirmTimeout, err = strconv.ParseBool(values[1])
 	case "maxSubInfoReportInterval":
-		config.Consumer.maxSubInfoReportInterval, err = strconv.Atoi(values[1])
+		config.Consumer.MaxSubInfoReportInterval, err = strconv.Atoi(values[1])
 	case "maxPartCheckPeriod":
-		config.Consumer.maxPartCheckPeriod, err = parseDuration(values[1])
+		config.Consumer.MaxPartCheckPeriod, err = parseDuration(values[1])
 	case "partCheckSlice":
-		config.Consumer.partCheckSlice, err = parseDuration(values[1])
+		config.Consumer.PartCheckSlice, err = parseDuration(values[1])
 	case "msgNotFoundWait":
-		config.Consumer.msgNotFoundWait, err = parseDuration(values[1])
+		config.Consumer.MsgNotFoundWait, err = parseDuration(values[1])
 	case "rebConfirmWait":
-		config.Consumer.rebConfirmWait, err = parseDuration(values[1])
+		config.Consumer.RebConfirmWait, err = parseDuration(values[1])
 	case "maxConfirmWait":
-		config.Consumer.maxConfirmWait, err = parseDuration(values[1])
+		config.Consumer.MaxConfirmWait, err = parseDuration(values[1])
 	case "shutdownRebWait":
-		config.Consumer.shutdownRebWait, err = parseDuration(values[1])
+		config.Consumer.ShutdownRebWait, err = parseDuration(values[1])
 	case "heartbeatInterval":
-		config.Heartbeat.interval, err = parseDuration(values[1])
+		config.Heartbeat.Interval, err = parseDuration(values[1])
 	case "heartbeatMaxRetryTimes":
-		config.Heartbeat.maxRetryTimes, err = strconv.Atoi(values[1])
+		config.Heartbeat.MaxRetryTimes, err = strconv.Atoi(values[1])
 	case "heartbeatAfterFail":
-		config.Heartbeat.afterFail, err = parseDuration(values[1])
+		config.Heartbeat.AfterFail, err = parseDuration(values[1])
+	case "authEnable":
+		config.Net.Auth.Enable, err = strconv.ParseBool(values[1])
+	case "authUserName":
+		config.Net.Auth.UserName = values[1]
+	case "authPassword":
+		config.Net.Auth.Password = values[1]
 	default:
 		return fmt.Errorf("address format invalid, unknown keys: %v", values[0])
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index acefe1c..56bc600 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -28,24 +28,24 @@ func TestParseAddress(t *testing.T) {
 	address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
 	c, err := ParseAddress(address)
 	assert.Nil(t, err)
-	assert.Equal(t, c.Consumer.masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
-	assert.Equal(t, c.Consumer.topic, "Topic")
+	assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
+	assert.Equal(t, c.Consumer.Topic, "Topic")
 	assert.Equal(t, c.Consumer.Group, "Group")
-	assert.Equal(t, c.Consumer.msgNotFoundWait, 10000 * time.Millisecond)
+	assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
 	assert.Equal(t, c.Net.TLS.Enable, false)
 
-	assert.Equal(t, c.Heartbeat.maxRetryTimes, 6)
+	assert.Equal(t, c.Heartbeat.MaxRetryTimes, 6)
 
 	address = ""
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt=ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 }