You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/23 05:05:57 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-785]Fix Go SDK Metadata Bug (#585)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new f711a68  [INLONG-785]Fix Go SDK Metadata Bug (#585)
f711a68 is described below

commit f711a689bd33047bbf7cd28e45421896ce8797cc
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jul 23 13:05:15 2021 +0800

    [INLONG-785]Fix Go SDK Metadata Bug (#585)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/metadata/partition.go         | 46 ++++++++++------------
 .../tubemq-client-go/metadata/subcribe_info.go     | 23 +++--------
 2 files changed, 26 insertions(+), 43 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index f9a951d..321af95 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -75,32 +75,26 @@ func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32, dlt
 
 // NewPartition parses a partition from the given string.
 func NewPartition(partition string) (*Partition, error) {
-	var b *Node
-	var topic string
-	var partitionID int
-	var err error
-	pos := strings.Index(partition, "#")
-	if pos != -1 {
-		broker := strings.TrimSpace(partition[:pos])
-		b, err = NewNode(true, broker)
-		if err != nil {
-			return nil, err
-		}
-		p := strings.TrimSpace(partition[pos+1:])
-		pos = strings.Index(p, ":")
-		if pos != -1 {
-			topic = strings.TrimSpace(p[0:pos])
-			partitionID, err = strconv.Atoi(strings.TrimSpace(p[pos+1:]))
-			if err != nil {
-				return nil, err
-			}
-		}
+	b, err := NewNode(true, strings.Split(partition, "#")[0])
+	if err != nil {
+		return nil, err
 	}
+	s := strings.Split(partition, "#")[1]
+	topic := strings.Split(s, ":")[0]
+	partitionID, err := strconv.Atoi(strings.Split(s, ":")[1])
+	if err != nil {
+		return nil, err
+	}
+	partitionKey := strconv.Itoa(int(b.id)) + ":" + topic + ":" + strconv.Itoa(partitionID)
 	return &Partition{
 		topic:        topic,
 		broker:       b,
 		partitionID:  int32(partitionID),
 		strategyData: &strategyData{},
+		partitionKey: partitionKey,
+		consumeData: &ConsumeData{
+			curDataDlt: util.InvalidValue,
+		},
 	}, nil
 }
 
@@ -131,7 +125,7 @@ func (p *Partition) GetBroker() *Node {
 
 // String returns the metadata of a Partition as a string.
 func (p *Partition) String() string {
-	return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+	return p.broker.String() + "#" + p.topic + ":" + strconv.Itoa(int(p.partitionID))
 }
 
 // SetLastConsumed sets the last consumed.
@@ -167,7 +161,7 @@ func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHan
 			return 0
 		} else {
 			if p.strategyData.curStageMsgSize >= p.flowCtrl.GetDataSizeLimit() ||
-				p.strategyData.curStageMsgSize >= p.strategyData.limitSliceMsgSize {
+				p.strategyData.curSliceMsgSize >= p.strategyData.limitSliceMsgSize {
 				if p.flowCtrl.GetFreqMsLimit() > p.consumeData.dltLimit {
 					return p.flowCtrl.GetFreqMsLimit() - dltTime
 				}
@@ -198,20 +192,20 @@ func (p *Partition) updateStrategyData(defHandler *flowctrl.RuleHandler, groupHa
 	if curTime > p.strategyData.nextStageUpdate {
 		p.strategyData.curStageMsgSize = 0
 		p.strategyData.curSliceMsgSize = 0
-		if p.consumeData.curDataDlt >= 0 {
+		if p.consumeData.curDataDlt < 0 {
+			p.flowCtrl = flowctrl.NewResult(math.MaxInt64, 20)
+		} else {
 			p.flowCtrl = groupHandler.GetCurDataLimit(p.consumeData.curDataDlt)
 			if p.flowCtrl == nil {
 				p.flowCtrl = defHandler.GetCurDataLimit(p.consumeData.curDataDlt)
 			}
 			if p.flowCtrl == nil {
-				p.flowCtrl.SetDataSizeLimit(util.InvalidValue)
-				p.flowCtrl.SetFreqMsLimit(0)
+				p.flowCtrl = flowctrl.NewResult(math.MaxInt64, 0)
 			}
 			p.freqCtrl = groupHandler.GetFilterCtrlItem()
 			if p.freqCtrl.GetFreqMsLimit() < 0 {
 				p.freqCtrl = defHandler.GetFilterCtrlItem()
 			}
-			curTime = time.Now().UnixNano() / int64(time.Millisecond)
 		}
 		p.strategyData.limitSliceMsgSize = p.flowCtrl.GetDataSizeLimit() / 12
 		p.strategyData.nextStageUpdate = curTime + 60000
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index 932659a..72d1b8b 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -52,25 +52,14 @@ func (s *SubscribeInfo) String() string {
 // NewSubscribeInfo constructs a SubscribeInfo from a given string.
 // If the given is invalid, it will return error.
 func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
-	consumerID := ""
-	group := ""
-	var partition *Partition
-	var err error
-	pos := strings.Index(subscribeInfo, "#")
-	if pos != -1 {
-		consumerInfo := strings.TrimSpace(subscribeInfo[:pos])
-		partitionInfo := strings.TrimSpace(subscribeInfo[pos+1:])
-		partition, err = NewPartition(partitionInfo)
-		if err != nil {
-			return nil, err
-		}
-		pos = strings.Index(consumerInfo, "@")
-		consumerID = strings.TrimSpace(consumerInfo[:pos])
-		group = strings.TrimSpace(consumerInfo[pos+1:])
+	consumerInfo := strings.Split(subscribeInfo, "#")[0]
+	partition, err := NewPartition(subscribeInfo[strings.Index(subscribeInfo, "#")+1:])
+	if err != nil {
+		return nil, err
 	}
 	return &SubscribeInfo{
-		group:      group,
-		consumerID: consumerID,
+		group:      strings.Split(consumerInfo, "@")[1],
+		consumerID: strings.Split(consumerInfo, "@")[0],
 		partition:  partition,
 	}, nil
 }