You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/06 06:03:34 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo (#1526)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 11353eb  [INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo (#1526)
11353eb is described below

commit 11353eb2856c26c92e49c5d9837bf6bd0020444c
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Sep 6 14:03:28 2021 +0800

    [INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo (#1526)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go                | 13 +++++++++----
 tubemq-client-twins/tubemq-client-go/client/heartbeat.go    |  4 ++++
 .../tubemq-client-go/metadata/subscribe_info.go             |  2 +-
 tubemq-client-twins/tubemq-client-go/remote/remote.go       |  2 +-
 tubemq-client-twins/tubemq-client-go/util/util.go           |  5 +++--
 5 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index e42319b..a7fb27c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -162,6 +162,10 @@ func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error
 	node := &metadata.Node{}
 	node.SetHost(util.GetLocalHost())
 	node.SetAddress(c.master.Address)
+	auth := &protocol.AuthenticateInfo{}
+	if c.needGenMasterCertificateInfo(true) {
+		util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+	}
 	m.SetNode(node)
 	sub := &metadata.SubscribeInfo{}
 	sub.SetGroup(c.config.Consumer.Group)
@@ -512,7 +516,7 @@ func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
 	return auth
 }
 
-func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
+func (c *consumer) needGenMasterCertificateInfo(force bool) bool {
 	needAdd := false
 	if c.config.Net.Auth.Enable {
 		if force {
@@ -526,6 +530,7 @@ func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, f
 		if needAdd {
 		}
 	}
+	return needAdd
 }
 
 func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
@@ -700,10 +705,10 @@ func (c *consumer) close2Master() error {
 	sub := &metadata.SubscribeInfo{}
 	sub.SetGroup(c.config.Consumer.Group)
 	m.SetSubscribeInfo(sub)
+	mci := &protocol.MasterCertificateInfo{}
 	auth := &protocol.AuthenticateInfo{}
-	c.genMasterAuthenticateToken(auth, true)
-	mci := &protocol.MasterCertificateInfo{
-		AuthInfo: auth,
+	if c.needGenMasterCertificateInfo(true) {
+		util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
 	}
 	c.subInfo.SetMasterCertificateInfo(mci)
 	rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 49d4f7c..d74b320 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -88,6 +88,10 @@ func (h *heartbeatManager) consumerHB2Master() {
 	sub := &metadata.SubscribeInfo{}
 	sub.SetGroup(h.consumer.config.Consumer.Group)
 	m.SetSubscribeInfo(sub)
+	auth := &protocol.AuthenticateInfo{}
+	if h.consumer.needGenMasterCertificateInfo(true) {
+		util.GenMasterAuthenticateToken(auth, h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
+	}
 	h.consumer.unreportedTimes++
 	if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
 		m.SetReportTimes(true)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
index 3670204..6544a4d 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
@@ -55,7 +55,7 @@ func (s *SubscribeInfo) String() string {
 // If the given is invalid, it will return error.
 // The format of subscribeInfo string: consumerId@group#broker_info#topic:partitionId
 func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
-	s := strings.Split(subscribeInfo, "#")
+	s := strings.SplitN(subscribeInfo, "#", 2)
 	if len(s) == 1 {
 		return nil, errs.ErrInvalidSubscribeInfoString
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 6e66af9..5b26b0e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -463,7 +463,7 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
 		r.indexPartitions = nil
 		return
 	}
-	if pos >= len(r.indexPartitions) {
+	if pos == -1 || pos >= len(r.indexPartitions) {
 		return
 	}
 	r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
index 156540c..4fa1def 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -23,6 +23,8 @@ import (
 	"net"
 	"strconv"
 	"strings"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 )
 
 // InvalidValue defines the invalid value of TubeMQ config.
@@ -72,8 +74,7 @@ func GenBrokerAuthenticateToken(username string, password string) string {
 }
 
 // GenMasterAuthenticateToken generates the master authenticate token.
-func GenMasterAuthenticateToken(username string, password string) string {
-	return ""
+func GenMasterAuthenticateToken(authInfo *protocol.AuthenticateInfo, username string, password string) {
 }
 
 // ParseConfirmContext parses the confirm context to partition key and bookedTime.