You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 05:05:57 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-785]Fix Go SDK
Metadata Bug (#585)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new f711a68 [INLONG-785]Fix Go SDK Metadata Bug (#585)
f711a68 is described below
commit f711a689bd33047bbf7cd28e45421896ce8797cc
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 13:05:15 2021 +0800
[INLONG-785]Fix Go SDK Metadata Bug (#585)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/metadata/partition.go | 46 ++++++++++------------
.../tubemq-client-go/metadata/subcribe_info.go | 23 +++--------
2 files changed, 26 insertions(+), 43 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index f9a951d..321af95 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -75,32 +75,26 @@ func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32, dlt
// NewPartition parses a partition from the given string.
func NewPartition(partition string) (*Partition, error) {
- var b *Node
- var topic string
- var partitionID int
- var err error
- pos := strings.Index(partition, "#")
- if pos != -1 {
- broker := strings.TrimSpace(partition[:pos])
- b, err = NewNode(true, broker)
- if err != nil {
- return nil, err
- }
- p := strings.TrimSpace(partition[pos+1:])
- pos = strings.Index(p, ":")
- if pos != -1 {
- topic = strings.TrimSpace(p[0:pos])
- partitionID, err = strconv.Atoi(strings.TrimSpace(p[pos+1:]))
- if err != nil {
- return nil, err
- }
- }
+ b, err := NewNode(true, strings.Split(partition, "#")[0])
+ if err != nil {
+ return nil, err
}
+ s := strings.Split(partition, "#")[1]
+ topic := strings.Split(s, ":")[0]
+ partitionID, err := strconv.Atoi(strings.Split(s, ":")[1])
+ if err != nil {
+ return nil, err
+ }
+ partitionKey := strconv.Itoa(int(b.id)) + ":" + topic + ":" + strconv.Itoa(partitionID)
return &Partition{
topic: topic,
broker: b,
partitionID: int32(partitionID),
strategyData: &strategyData{},
+ partitionKey: partitionKey,
+ consumeData: &ConsumeData{
+ curDataDlt: util.InvalidValue,
+ },
}, nil
}
@@ -131,7 +125,7 @@ func (p *Partition) GetBroker() *Node {
// String returns the metadata of a Partition as a string.
func (p *Partition) String() string {
- return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+ return p.broker.String() + "#" + p.topic + ":" + strconv.Itoa(int(p.partitionID))
}
// SetLastConsumed sets the last consumed.
@@ -167,7 +161,7 @@ func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHan
return 0
} else {
if p.strategyData.curStageMsgSize >= p.flowCtrl.GetDataSizeLimit() ||
- p.strategyData.curStageMsgSize >= p.strategyData.limitSliceMsgSize {
+ p.strategyData.curSliceMsgSize >= p.strategyData.limitSliceMsgSize {
if p.flowCtrl.GetFreqMsLimit() > p.consumeData.dltLimit {
return p.flowCtrl.GetFreqMsLimit() - dltTime
}
@@ -198,20 +192,20 @@ func (p *Partition) updateStrategyData(defHandler *flowctrl.RuleHandler, groupHa
if curTime > p.strategyData.nextStageUpdate {
p.strategyData.curStageMsgSize = 0
p.strategyData.curSliceMsgSize = 0
- if p.consumeData.curDataDlt >= 0 {
+ if p.consumeData.curDataDlt < 0 {
+ p.flowCtrl = flowctrl.NewResult(math.MaxInt64, 20)
+ } else {
p.flowCtrl = groupHandler.GetCurDataLimit(p.consumeData.curDataDlt)
if p.flowCtrl == nil {
p.flowCtrl = defHandler.GetCurDataLimit(p.consumeData.curDataDlt)
}
if p.flowCtrl == nil {
- p.flowCtrl.SetDataSizeLimit(util.InvalidValue)
- p.flowCtrl.SetFreqMsLimit(0)
+ p.flowCtrl = flowctrl.NewResult(math.MaxInt64, 0)
}
p.freqCtrl = groupHandler.GetFilterCtrlItem()
if p.freqCtrl.GetFreqMsLimit() < 0 {
p.freqCtrl = defHandler.GetFilterCtrlItem()
}
- curTime = time.Now().UnixNano() / int64(time.Millisecond)
}
p.strategyData.limitSliceMsgSize = p.flowCtrl.GetDataSizeLimit() / 12
p.strategyData.nextStageUpdate = curTime + 60000
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index 932659a..72d1b8b 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -52,25 +52,14 @@ func (s *SubscribeInfo) String() string {
// NewSubscribeInfo constructs a SubscribeInfo from a given string.
// If the given is invalid, it will return error.
func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
- consumerID := ""
- group := ""
- var partition *Partition
- var err error
- pos := strings.Index(subscribeInfo, "#")
- if pos != -1 {
- consumerInfo := strings.TrimSpace(subscribeInfo[:pos])
- partitionInfo := strings.TrimSpace(subscribeInfo[pos+1:])
- partition, err = NewPartition(partitionInfo)
- if err != nil {
- return nil, err
- }
- pos = strings.Index(consumerInfo, "@")
- consumerID = strings.TrimSpace(consumerInfo[:pos])
- group = strings.TrimSpace(consumerInfo[pos+1:])
+ consumerInfo := strings.Split(subscribeInfo, "#")[0]
+ partition, err := NewPartition(subscribeInfo[strings.Index(subscribeInfo, "#")+1:])
+ if err != nil {
+ return nil, err
}
return &SubscribeInfo{
- group: group,
- consumerID: consumerID,
+ group: strings.Split(consumerInfo, "@")[1],
+ consumerID: strings.Split(consumerInfo, "@")[0],
partition: partition,
}, nil
}