You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/06 06:03:34 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1525]Fix bug
which will cause Go SDK fail to parse SubscribeInfo (#1526)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 11353eb [INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo (#1526)
11353eb is described below
commit 11353eb2856c26c92e49c5d9837bf6bd0020444c
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Sep 6 14:03:28 2021 +0800
[INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo (#1526)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer_impl.go | 13 +++++++++----
tubemq-client-twins/tubemq-client-go/client/heartbeat.go | 4 ++++
.../tubemq-client-go/metadata/subscribe_info.go | 2 +-
tubemq-client-twins/tubemq-client-go/remote/remote.go | 2 +-
tubemq-client-twins/tubemq-client-go/util/util.go | 5 +++--
5 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index e42319b..a7fb27c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -162,6 +162,10 @@ func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error
node := &metadata.Node{}
node.SetHost(util.GetLocalHost())
node.SetAddress(c.master.Address)
+ auth := &protocol.AuthenticateInfo{}
+ if c.needGenMasterCertificateInfo(true) {
+ util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+ }
m.SetNode(node)
sub := &metadata.SubscribeInfo{}
sub.SetGroup(c.config.Consumer.Group)
@@ -512,7 +516,7 @@ func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
return auth
}
-func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
+func (c *consumer) needGenMasterCertificateInfo(force bool) bool {
needAdd := false
if c.config.Net.Auth.Enable {
if force {
@@ -526,6 +530,7 @@ func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, f
if needAdd {
}
}
+ return needAdd
}
func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
@@ -700,10 +705,10 @@ func (c *consumer) close2Master() error {
sub := &metadata.SubscribeInfo{}
sub.SetGroup(c.config.Consumer.Group)
m.SetSubscribeInfo(sub)
+ mci := &protocol.MasterCertificateInfo{}
auth := &protocol.AuthenticateInfo{}
- c.genMasterAuthenticateToken(auth, true)
- mci := &protocol.MasterCertificateInfo{
- AuthInfo: auth,
+ if c.needGenMasterCertificateInfo(true) {
+ util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
}
c.subInfo.SetMasterCertificateInfo(mci)
rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 49d4f7c..d74b320 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -88,6 +88,10 @@ func (h *heartbeatManager) consumerHB2Master() {
sub := &metadata.SubscribeInfo{}
sub.SetGroup(h.consumer.config.Consumer.Group)
m.SetSubscribeInfo(sub)
+ auth := &protocol.AuthenticateInfo{}
+ if h.consumer.needGenMasterCertificateInfo(true) {
+ util.GenMasterAuthenticateToken(auth, h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
+ }
h.consumer.unreportedTimes++
if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
m.SetReportTimes(true)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
index 3670204..6544a4d 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
@@ -55,7 +55,7 @@ func (s *SubscribeInfo) String() string {
// If the given is invalid, it will return error.
// The format of subscribeInfo string: consumerId@group#broker_info#topic:partitionId
func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
- s := strings.Split(subscribeInfo, "#")
+ s := strings.SplitN(subscribeInfo, "#", 2)
if len(s) == 1 {
return nil, errs.ErrInvalidSubscribeInfoString
}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 6e66af9..5b26b0e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -463,7 +463,7 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
r.indexPartitions = nil
return
}
- if pos >= len(r.indexPartitions) {
+ if pos == -1 || pos >= len(r.indexPartitions) {
return
}
r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
index 156540c..4fa1def 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -23,6 +23,8 @@ import (
"net"
"strconv"
"strings"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
)
// InvalidValue defines the invalid value of TubeMQ config.
@@ -72,8 +74,7 @@ func GenBrokerAuthenticateToken(username string, password string) string {
}
// GenMasterAuthenticateToken generates the master authenticate token.
-func GenMasterAuthenticateToken(username string, password string) string {
- return ""
+func GenMasterAuthenticateToken(authInfo *protocol.AuthenticateInfo, username string, password string) {
}
// ParseConfirmContext parses the confirm context to partition key and bookedTime.