You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/14 06:09:41 UTC

[incubator-inlong] branch INLONG-25 updated: [inlong-1550]Go SDK should obey the flow control rule (#1551)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new be0430c  [inlong-1550]Go SDK should obey the flow control rule (#1551)
be0430c is described below

commit be0430c3c5639de5dd51bcc0779afbb0d4deb358
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 14 14:09:34 2021 +0800

    [inlong-1550]Go SDK should obey the flow control rule (#1551)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go             |  1 +
 tubemq-client-twins/tubemq-client-go/flowctrl/handler.go | 16 +++++++++++-----
 .../tubemq-client-go/metadata/partition.go               |  6 +++---
 3 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8f48513..f8ef878 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -614,6 +614,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
 		cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, defDltTime, rsp.GetRequireSlow())
 		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
 		c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+		c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false)
 		return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 	case errs.RetErrNotFound:
 		limitDlt = c.config.Consumer.MsgNotFoundWait.Milliseconds()
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index a454e60..88e1c83 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -198,7 +198,7 @@ func (h *RuleHandler) GetFilterCtrlItem() *Item {
 	return h.flowCtrlItem
 }
 
-// GetCurFreqLimitTim returns curFreqLimitTime.
+// GetCurFreqLimitTime returns curFreqLimitTime.
 func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit int64) int64 {
 	limitData := receivedLimit
 	if int64(msgZeroCnt) < atomic.LoadInt64(&h.minZeroCount) {
@@ -210,15 +210,16 @@ func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit int64)
 		return limitData
 	}
 	for _, rule := range h.flowCtrlRules[1] {
-		limitData = rule.getFreLimit(msgZeroCnt)
-		if limitData >= 0 {
+		limit := rule.getFreLimit(msgZeroCnt)
+		if limit >= 0 {
+			limitData = limit
 			break
 		}
 	}
 	return limitData
 }
 
-// getMinZeroCnt returns the minZeroCount,
+// GetMinZeroCnt returns the minZeroCount.
 func (h *RuleHandler) GetMinZeroCnt() int64 {
 	return atomic.LoadInt64(&h.minZeroCount)
 }
@@ -331,6 +332,11 @@ func parseDataLimit(rules []interface{}) ([]*Item, error) {
 		item.SetFreqLimit(freqMsLimit)
 		items = append(items, item)
 	}
+	if len(items) > 0 {
+		sort.Slice(items, func(i, j int) bool {
+			return items[i].startTime <= items[j].startTime
+		})
+	}
 	return items, nil
 }
 
@@ -409,7 +415,7 @@ func parseFreqLimit(rules []interface{}) ([]*Item, error) {
 	}
 	if len(items) > 0 {
 		sort.Slice(items, func(i, j int) bool {
-			return items[i].zeroCnt < items[j].zeroCnt
+			return items[i].zeroCnt > items[j].zeroCnt
 		})
 	}
 	return items, nil
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index c256f70..a8cccbd 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -149,14 +149,14 @@ func (p *Partition) BookConsumeData(data *ConsumeData) {
 	p.consumeData = data
 }
 
-// ProcConsumeResult processes the consume result.
+// ProcConsumeResult processes consume result.
 func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHandler *flowctrl.RuleHandler, filterConsume bool, lastConsumed bool) int64 {
 	dltTime := time.Now().UnixNano()/int64(time.Millisecond) - p.consumeData.time
 	p.updateStrategyData(defHandler, groupHandler)
 	p.lastConsumed = lastConsumed
 	switch p.consumeData.errCode {
 	case errs.RetSuccess, errs.RetErrNotFound:
-		if p.consumeData.msgSize == 0 && p.consumeData.errCode == 200 {
+		if p.consumeData.msgSize == 0 && p.consumeData.errCode != errs.RetSuccess {
 			p.totalZeroCnt++
 		} else {
 			p.totalZeroCnt = 0
@@ -192,7 +192,7 @@ func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHan
 			return p.consumeData.dltLimit - dltTime
 		}
 	default:
-		return p.consumeData.curDataDlt - dltTime
+		return p.consumeData.dltLimit - dltTime
 	}
 }