You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 08:47:45 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-793]Fix Some Corner Case in Go SDK (#588)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 85aa1e4  [INLONG-793]Fix Some Corner Case in Go SDK (#588)
85aa1e4 is described below

commit 85aa1e45d3258040da094a06069bdff92ea1fe9c
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 16:47:35 2021 +0800

    [INLONG-793]Fix Some Corner Case in Go SDK (#588)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/tubemq_codec.go         |  6 +--
 tubemq-client-twins/tubemq-client-go/errs/errs.go  | 51 +++++++++++++---------
 .../tubemq-client-go/metadata/metadata.go          |  2 +-
 .../tubemq-client-go/metadata/node.go              |  6 +++
 .../tubemq-client-go/metadata/partition.go         | 16 +++++--
 .../{subcribe_info.go => subscribe_info.go}        | 18 ++++++--
 .../tubemq-client-go/remote/remote.go              |  9 ++--
 7 files changed, 70 insertions(+), 38 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index db114b3..b7e796b 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -35,9 +35,9 @@ import (
 )
 
 const (
-	// The default begin token of TubeMQ RPC protocol.
+	// RPCProtocolBeginToken is the default begin token of TubeMQ RPC protocol.
 	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
-	// The default max buffer size the RPC response.
+	// RPCMaxBufferSize is the default max buffer size the RPC response.
 	RPCMaxBufferSize int    = 8192
 	frameHeadLen     uint32 = 12
 	maxBufferSize    int    = 128 * 1024
@@ -138,7 +138,7 @@ func (t TubeMQResponse) GetSerialNo() uint32 {
 	return t.serialNo
 }
 
-// GetResponseBuf will return the body of Response.
+// GetBuffer will return the body of Response.
 func (t TubeMQResponse) GetBuffer() []byte {
 	return t.Buffer
 }
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index ec94229..1989ff5 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -34,31 +34,40 @@ const (
 	// RetRequestFailure represents the error code of request error.
 	RetRequestFailure = 5
 	// RetSelectorNotExist represents the selector not exists.
-	RetSelectorNotExist        = 6
-	RetSuccess                 = 200
-	RetErrMoved                = 301
-	RetBadRequest              = 400
-	RetErrForbidden            = 403
-	RetErrNotFound             = 404
-	RetErrNoPartAssigned       = 406
-	RetErrAllPartWaiting       = 407
-	RetErrAllPartInUse         = 408
-	RetErrHBNoNode             = 411
-	RetErrDuplicatePartition   = 412
-	RetCertificateFailure      = 415
-	RetConsumeGroupForbidden   = 450
-	RetConsumeContentForbidden = 455
-	RetErrServiceUnavailable   = 503
-	RetErrConsumeSpeedLimit    = 550
-	RetErrConfirmTimeout       = 2004
+	RetSelectorNotExist = 6
+	// RetInvalidNodeString represents the node string is invalid.
+	RetInvalidNodeString = 7
+	// RetInvalidPartitionString represents the partition string is invalid.
+	RetInvalidPartitionString = 8
+	// RetInvalidSubscribeInfoString represents the subscribeInfo string is invalid.
+	RetInvalidSubscribeInfoString = 9
+	RetSuccess                    = 200
+	RetErrMoved                   = 301
+	RetBadRequest                 = 400
+	RetErrForbidden               = 403
+	RetErrNotFound                = 404
+	RetErrNoPartAssigned          = 406
+	RetErrAllPartWaiting          = 407
+	RetErrAllPartInUse            = 408
+	RetErrHBNoNode                = 411
+	RetErrDuplicatePartition      = 412
+	RetCertificateFailure         = 415
+	RetConsumeGroupForbidden      = 450
+	RetConsumeContentForbidden    = 455
+	RetErrServiceUnavailable      = 503
+	RetErrConsumeSpeedLimit       = 550
+	RetErrConfirmTimeout          = 2004
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
 var (
-	ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
-	ErrNoPartAssigned   = New(RetErrNoPartAssigned, "No partition info in local cache, please retry later!")
-	ErrAllPartWaiting   = New(RetErrAllPartWaiting, "All partitions reach max position, please retry later!")
-	ErrAllPartInUse     = New(RetErrAllPartInUse, "No idle partition to consume, please retry later!")
+	ErrAssertionFailure           = New(RetAssertionFailure, "AssertionFailure")
+	ErrNoPartAssigned             = New(RetErrNoPartAssigned, "No partition info in local cache, please retry later!")
+	ErrAllPartWaiting             = New(RetErrAllPartWaiting, "All partitions reach max position, please retry later!")
+	ErrAllPartInUse               = New(RetErrAllPartInUse, "No idle partition to consume, please retry later!")
+	ErrInvalidNodeString          = New(RetInvalidNodeString, "Node string should have format: node_id:host:port")
+	ErrInvalidPartitionString     = New(RetInvalidPartitionString, "Partition string should have format: broker_info#topic:partitionId")
+	ErrInvalidSubscribeInfoString = New(RetInvalidSubscribeInfoString, "SubscribeInfo string should have format: consumerId@group#broker_info#topic:partitionId")
 )
 
 // Error provides a TubeMQ-specific error container
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index f64674a..5b997e3 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-// package metadata defines all the metadata of the TubeMQ broker and producer.
+// Package metadata defines all the metadata of the TubeMQ broker and producer.
 package metadata
 
 // Metadata represents the metadata of
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index bcfda82..ca6a423 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -20,6 +20,8 @@ package metadata
 import (
 	"strconv"
 	"strings"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 )
 
 // Node represents the metadata of a node.
@@ -32,8 +34,12 @@ type Node struct {
 
 // NewNode constructs a node from a given string.
 // If the given string is invalid, it will return error.
+// The format of node string: nodeID:host:port
 func NewNode(isBroker bool, node string) (*Node, error) {
 	res := strings.Split(node, ":")
+	if len(res) == 1 {
+		return nil, errs.ErrInvalidNodeString
+	}
 	nodeID := 0
 	host := ""
 	port := 8123
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 321af95..243176c 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -74,14 +74,22 @@ func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32, dlt
 }
 
 // NewPartition parses a partition from the given string.
+// The format of partition string: brokerInfo#topic:partitionId
 func NewPartition(partition string) (*Partition, error) {
-	b, err := NewNode(true, strings.Split(partition, "#")[0])
+	s := strings.Split(partition, "#")
+	if len(s) == 1 {
+		return nil, errs.ErrInvalidPartitionString
+	}
+	b, err := NewNode(true, s[0])
 	if err != nil {
 		return nil, err
 	}
-	s := strings.Split(partition, "#")[1]
-	topic := strings.Split(s, ":")[0]
-	partitionID, err := strconv.Atoi(strings.Split(s, ":")[1])
+	topicPartitionID := strings.Split(s[1], ":")
+	if len(topicPartitionID) == 1 {
+		return nil, errs.ErrInvalidPartitionString
+	}
+	topic := topicPartitionID[0]
+	partitionID, err := strconv.Atoi(topicPartitionID[1])
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
similarity index 81%
rename from tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
rename to tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
index 72d1b8b..6ffc2d2 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
@@ -20,6 +20,8 @@ package metadata
 import (
 	"fmt"
 	"strings"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 )
 
 // SubscribeInfo represents the metadata of the subscribe info.
@@ -51,15 +53,23 @@ func (s *SubscribeInfo) String() string {
 
 // NewSubscribeInfo constructs a SubscribeInfo from a given string.
 // If the given is invalid, it will return error.
+// The format of subscribeInfo string: consumerId@group#broker_info#topic:partitionId
 func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
-	consumerInfo := strings.Split(subscribeInfo, "#")[0]
-	partition, err := NewPartition(subscribeInfo[strings.Index(subscribeInfo, "#")+1:])
+	s := strings.Split(subscribeInfo, "#")
+	if len(s) == 1 {
+		return nil, errs.ErrInvalidSubscribeInfoString
+	}
+	consumerInfo := strings.Split(s[0], "@")
+	if len(consumerInfo) == 1 {
+		return nil, errs.ErrInvalidSubscribeInfoString
+	}
+	partition, err := NewPartition(s[1])
 	if err != nil {
 		return nil, err
 	}
 	return &SubscribeInfo{
-		group:      strings.Split(consumerInfo, "@")[1],
-		consumerID: strings.Split(consumerInfo, "@")[0],
+		group:      consumerInfo[1],
+		consumerID: consumerInfo[0],
 		partition:  partition,
 	}, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 637859e..6c8d920 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -67,11 +67,9 @@ func NewRmtDataCache() *RmtDataCache {
 		groupFlowCtrlID:    util.InvalidValue,
 		qryPriorityID:      int32(util.InvalidValue),
 		partitionSubInfo:   make(map[string]*metadata.SubscribeInfo),
-		rebalanceResults:   make([]*metadata.ConsumerEvent, 0, 0),
 		brokerPartitions:   make(map[string]map[string]bool),
 		partitions:         make(map[string]*metadata.Partition),
 		usedPartitions:     make(map[string]int64),
-		indexPartitions:    make([]string, 0, 0),
 		partitionTimeouts:  make(map[string]*time.Timer),
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
@@ -454,12 +452,13 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
 		}
 	}
 	if len(r.indexPartitions) == 1 && pos == 0 {
-		r.indexPartitions = []string{}
+		r.indexPartitions = nil
 		return
 	}
-	if pos >= 0 {
-		r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
+	if pos >= len(r.indexPartitions) {
+		return
 	}
+	r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
 }
 
 func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64) {