You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 08:47:45 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-793]Fix Some
Corner Case in Go SDK (#588)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 85aa1e4 [INLONG-793]Fix Some Corner Case in Go SDK (#588)
85aa1e4 is described below
commit 85aa1e45d3258040da094a06069bdff92ea1fe9c
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 16:47:35 2021 +0800
[INLONG-793]Fix Some Corner Case in Go SDK (#588)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/tubemq_codec.go | 6 +--
tubemq-client-twins/tubemq-client-go/errs/errs.go | 51 +++++++++++++---------
.../tubemq-client-go/metadata/metadata.go | 2 +-
.../tubemq-client-go/metadata/node.go | 6 +++
.../tubemq-client-go/metadata/partition.go | 16 +++++--
.../{subcribe_info.go => subscribe_info.go} | 18 ++++++--
.../tubemq-client-go/remote/remote.go | 9 ++--
7 files changed, 70 insertions(+), 38 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index db114b3..b7e796b 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -35,9 +35,9 @@ import (
)
const (
- // The default begin token of TubeMQ RPC protocol.
+ // RPCProtocolBeginToken is the default begin token of TubeMQ RPC protocol.
RPCProtocolBeginToken uint32 = 0xFF7FF4FE
- // The default max buffer size the RPC response.
+ // RPCMaxBufferSize is the default max buffer size the RPC response.
RPCMaxBufferSize int = 8192
frameHeadLen uint32 = 12
maxBufferSize int = 128 * 1024
@@ -138,7 +138,7 @@ func (t TubeMQResponse) GetSerialNo() uint32 {
return t.serialNo
}
-// GetResponseBuf will return the body of Response.
+// GetBuffer will return the body of Response.
func (t TubeMQResponse) GetBuffer() []byte {
return t.Buffer
}
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index ec94229..1989ff5 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -34,31 +34,40 @@ const (
// RetRequestFailure represents the error code of request error.
RetRequestFailure = 5
// RetSelectorNotExist represents the selector not exists.
- RetSelectorNotExist = 6
- RetSuccess = 200
- RetErrMoved = 301
- RetBadRequest = 400
- RetErrForbidden = 403
- RetErrNotFound = 404
- RetErrNoPartAssigned = 406
- RetErrAllPartWaiting = 407
- RetErrAllPartInUse = 408
- RetErrHBNoNode = 411
- RetErrDuplicatePartition = 412
- RetCertificateFailure = 415
- RetConsumeGroupForbidden = 450
- RetConsumeContentForbidden = 455
- RetErrServiceUnavailable = 503
- RetErrConsumeSpeedLimit = 550
- RetErrConfirmTimeout = 2004
+ RetSelectorNotExist = 6
+ // RetInvalidNodeString represents the node string is invalid.
+ RetInvalidNodeString = 7
+ // RetInvalidPartitionString represents the partition string is invalid.
+ RetInvalidPartitionString = 8
+ // RetInvalidSubscribeInfoString represents the subscribeInfo string is invalid.
+ RetInvalidSubscribeInfoString = 9
+ RetSuccess = 200
+ RetErrMoved = 301
+ RetBadRequest = 400
+ RetErrForbidden = 403
+ RetErrNotFound = 404
+ RetErrNoPartAssigned = 406
+ RetErrAllPartWaiting = 407
+ RetErrAllPartInUse = 408
+ RetErrHBNoNode = 411
+ RetErrDuplicatePartition = 412
+ RetCertificateFailure = 415
+ RetConsumeGroupForbidden = 450
+ RetConsumeContentForbidden = 455
+ RetErrServiceUnavailable = 503
+ RetErrConsumeSpeedLimit = 550
+ RetErrConfirmTimeout = 2004
)
// ErrAssertionFailure represents RetAssertionFailure error.
var (
- ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
- ErrNoPartAssigned = New(RetErrNoPartAssigned, "No partition info in local cache, please retry later!")
- ErrAllPartWaiting = New(RetErrAllPartWaiting, "All partitions reach max position, please retry later!")
- ErrAllPartInUse = New(RetErrAllPartInUse, "No idle partition to consume, please retry later!")
+ ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+ ErrNoPartAssigned = New(RetErrNoPartAssigned, "No partition info in local cache, please retry later!")
+ ErrAllPartWaiting = New(RetErrAllPartWaiting, "All partitions reach max position, please retry later!")
+ ErrAllPartInUse = New(RetErrAllPartInUse, "No idle partition to consume, please retry later!")
+ ErrInvalidNodeString = New(RetInvalidNodeString, "Node string should have format: node_id:host:port")
+ ErrInvalidPartitionString = New(RetInvalidPartitionString, "Partition string should have format: broker_info#topic:partitionId")
+ ErrInvalidSubscribeInfoString = New(RetInvalidSubscribeInfoString, "SubscribeInfo string should have format: consumerId@group#broker_info#topic:partitionId")
)
// Error provides a TubeMQ-specific error container
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index f64674a..5b997e3 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-// package metadata defines all the metadata of the TubeMQ broker and producer.
+// Package metadata defines all the metadata of the TubeMQ broker and producer.
package metadata
// Metadata represents the metadata of
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index bcfda82..ca6a423 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -20,6 +20,8 @@ package metadata
import (
"strconv"
"strings"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
)
// Node represents the metadata of a node.
@@ -32,8 +34,12 @@ type Node struct {
// NewNode constructs a node from a given string.
// If the given string is invalid, it will return error.
+// The format of node string: nodeID:host:port
func NewNode(isBroker bool, node string) (*Node, error) {
res := strings.Split(node, ":")
+ if len(res) == 1 {
+ return nil, errs.ErrInvalidNodeString
+ }
nodeID := 0
host := ""
port := 8123
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 321af95..243176c 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -74,14 +74,22 @@ func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32, dlt
}
// NewPartition parses a partition from the given string.
+// The format of partition string: brokerInfo#topic:partitionId
func NewPartition(partition string) (*Partition, error) {
- b, err := NewNode(true, strings.Split(partition, "#")[0])
+ s := strings.Split(partition, "#")
+ if len(s) == 1 {
+ return nil, errs.ErrInvalidPartitionString
+ }
+ b, err := NewNode(true, s[0])
if err != nil {
return nil, err
}
- s := strings.Split(partition, "#")[1]
- topic := strings.Split(s, ":")[0]
- partitionID, err := strconv.Atoi(strings.Split(s, ":")[1])
+ topicPartitionID := strings.Split(s[1], ":")
+ if len(topicPartitionID) == 1 {
+ return nil, errs.ErrInvalidPartitionString
+ }
+ topic := topicPartitionID[0]
+ partitionID, err := strconv.Atoi(topicPartitionID[1])
if err != nil {
return nil, err
}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
similarity index 81%
rename from tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
rename to tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
index 72d1b8b..6ffc2d2 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
@@ -20,6 +20,8 @@ package metadata
import (
"fmt"
"strings"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
)
// SubscribeInfo represents the metadata of the subscribe info.
@@ -51,15 +53,23 @@ func (s *SubscribeInfo) String() string {
// NewSubscribeInfo constructs a SubscribeInfo from a given string.
// If the given is invalid, it will return error.
+// The format of subscribeInfo string: consumerId@group#broker_info#topic:partitionId
func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
- consumerInfo := strings.Split(subscribeInfo, "#")[0]
- partition, err := NewPartition(subscribeInfo[strings.Index(subscribeInfo, "#")+1:])
+ s := strings.Split(subscribeInfo, "#")
+ if len(s) == 1 {
+ return nil, errs.ErrInvalidSubscribeInfoString
+ }
+ consumerInfo := strings.Split(s[0], "@")
+ if len(consumerInfo) == 1 {
+ return nil, errs.ErrInvalidSubscribeInfoString
+ }
+ partition, err := NewPartition(s[1])
if err != nil {
return nil, err
}
return &SubscribeInfo{
- group: strings.Split(consumerInfo, "@")[1],
- consumerID: strings.Split(consumerInfo, "@")[0],
+ group: consumerInfo[1],
+ consumerID: consumerInfo[0],
partition: partition,
}, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 637859e..6c8d920 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -67,11 +67,9 @@ func NewRmtDataCache() *RmtDataCache {
groupFlowCtrlID: util.InvalidValue,
qryPriorityID: int32(util.InvalidValue),
partitionSubInfo: make(map[string]*metadata.SubscribeInfo),
- rebalanceResults: make([]*metadata.ConsumerEvent, 0, 0),
brokerPartitions: make(map[string]map[string]bool),
partitions: make(map[string]*metadata.Partition),
usedPartitions: make(map[string]int64),
- indexPartitions: make([]string, 0, 0),
partitionTimeouts: make(map[string]*time.Timer),
topicPartitions: make(map[string]map[string]bool),
partitionRegBooked: make(map[string]bool),
@@ -454,12 +452,13 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
}
}
if len(r.indexPartitions) == 1 && pos == 0 {
- r.indexPartitions = []string{}
+ r.indexPartitions = nil
return
}
- if pos >= 0 {
- r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
+ if pos >= len(r.indexPartitions) {
+ return
}
+ r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
}
func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64) {