You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/14 06:09:41 UTC
[incubator-inlong] branch INLONG-25 updated: [inlong-1550]Go SDK
should obey the flow control rule (#1551)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new be0430c [inlong-1550]Go SDK should obey the flow control rule (#1551)
be0430c is described below
commit be0430c3c5639de5dd51bcc0779afbb0d4deb358
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 14 14:09:34 2021 +0800
[inlong-1550]Go SDK should obey the flow control rule (#1551)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer_impl.go | 1 +
tubemq-client-twins/tubemq-client-go/flowctrl/handler.go | 16 +++++++++++-----
.../tubemq-client-go/metadata/partition.go | 6 +++---
3 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8f48513..f8ef878 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -614,6 +614,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, defDltTime, rsp.GetRequireSlow())
c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+ c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false)
return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
case errs.RetErrNotFound:
limitDlt = c.config.Consumer.MsgNotFoundWait.Milliseconds()
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index a454e60..88e1c83 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -198,7 +198,7 @@ func (h *RuleHandler) GetFilterCtrlItem() *Item {
return h.flowCtrlItem
}
-// GetCurFreqLimitTim returns curFreqLimitTime.
+// GetCurFreqLimitTime returns curFreqLimitTime.
func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit int64) int64 {
limitData := receivedLimit
if int64(msgZeroCnt) < atomic.LoadInt64(&h.minZeroCount) {
@@ -210,15 +210,16 @@ func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit int64)
return limitData
}
for _, rule := range h.flowCtrlRules[1] {
- limitData = rule.getFreLimit(msgZeroCnt)
- if limitData >= 0 {
+ limit := rule.getFreLimit(msgZeroCnt)
+ if limit >= 0 {
+ limitData = limit
break
}
}
return limitData
}
-// getMinZeroCnt returns the minZeroCount,
+// GetMinZeroCnt returns the minZeroCount.
func (h *RuleHandler) GetMinZeroCnt() int64 {
return atomic.LoadInt64(&h.minZeroCount)
}
@@ -331,6 +332,11 @@ func parseDataLimit(rules []interface{}) ([]*Item, error) {
item.SetFreqLimit(freqMsLimit)
items = append(items, item)
}
+ if len(items) > 0 {
+ sort.Slice(items, func(i, j int) bool {
+ return items[i].startTime <= items[j].startTime
+ })
+ }
return items, nil
}
@@ -409,7 +415,7 @@ func parseFreqLimit(rules []interface{}) ([]*Item, error) {
}
if len(items) > 0 {
sort.Slice(items, func(i, j int) bool {
- return items[i].zeroCnt < items[j].zeroCnt
+ return items[i].zeroCnt > items[j].zeroCnt
})
}
return items, nil
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index c256f70..a8cccbd 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -149,14 +149,14 @@ func (p *Partition) BookConsumeData(data *ConsumeData) {
p.consumeData = data
}
-// ProcConsumeResult processes the consume result.
+// ProcConsumeResult processes consume result.
func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHandler *flowctrl.RuleHandler, filterConsume bool, lastConsumed bool) int64 {
dltTime := time.Now().UnixNano()/int64(time.Millisecond) - p.consumeData.time
p.updateStrategyData(defHandler, groupHandler)
p.lastConsumed = lastConsumed
switch p.consumeData.errCode {
case errs.RetSuccess, errs.RetErrNotFound:
- if p.consumeData.msgSize == 0 && p.consumeData.errCode == 200 {
+ if p.consumeData.msgSize == 0 && p.consumeData.errCode != errs.RetSuccess {
p.totalZeroCnt++
} else {
p.totalZeroCnt = 0
@@ -192,7 +192,7 @@ func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, groupHan
return p.consumeData.dltLimit - dltTime
}
default:
- return p.consumeData.curDataDlt - dltTime
+ return p.consumeData.dltLimit - dltTime
}
}