You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/22 08:53:33 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-780]Fix flow
control bug (#575)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 8742079 [INLONG-780]Fix flow control bug (#575)
8742079 is described below
commit 87420790c25ffbe64b11a1020dbc5ef6e5594b51
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jul 22 16:53:28 2021 +0800
[INLONG-780]Fix flow control bug (#575)
---
.../tubemq-client-go/client/consumer_impl.go | 28 ++++----
.../tubemq-client-go/flowctrl/handler.go | 76 +++++++++++-----------
2 files changed, 52 insertions(+), 52 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 4a2255d..11e7620 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -48,6 +48,7 @@ const (
consumeStatusNormal = 0
consumeStatusFromMax = 1
consumeStatusFromMaxAlways = 2
+ msgFlagIncProperties = 0x01
)
type consumer struct {
@@ -123,19 +124,18 @@ func (c *consumer) register2Master(needChange bool) error {
retryCount := 0
for {
rsp, err := c.sendRegRequest2Master()
- if err != nil {
- log.Infof("[CONSUMER]register2Master error %s", err.Error())
- return err
- }
-
- log.Info("register2Master response %s", rsp.String())
- if !rsp.GetSuccess() {
- if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
- log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
+ if err != nil || !rsp.GetSuccess() {
+ if err != nil {
+ log.Errorf("[CONSUMER]register2Master error %s", err.Error())
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.GetErrMsg())
return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
if !c.master.HasNext {
+ if rsp != nil {
+ log.Errorf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.GetErrMsg())
+ }
break
}
retryCount++
@@ -145,6 +145,7 @@ func (c *consumer) register2Master(needChange bool) error {
}
continue
}
+ log.Info("register2Master response %s", rsp.String())
c.masterHBRetry = 0
c.processRegisterResponseM2C(rsp)
@@ -228,10 +229,9 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
if err != nil {
log.Infof("[CONSUMER]GetMessage error %s", err.Error())
- err1 := c.rmtDataCache.ReleasePartition(true, isFiltered, confirmContext, false)
- if err1 != nil {
- log.Error("[CONSUMER]GetMessage release partition error %s", err1.Error())
- return nil, err1
+ if err := c.rmtDataCache.ReleasePartition(true, isFiltered, confirmContext, false); err != nil {
+ log.Errorf("[CONSUMER]GetMessage release partition error %s", err.Error())
+ return nil, err
}
return nil, err
}
@@ -633,7 +633,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
payLoadData := m.GetPayLoadData()
dataLen := len(payLoadData)
var properties map[string]string
- if m.GetFlag()&0x01 == 1 {
+ if m.GetFlag()&msgFlagIncProperties == 1 {
if len(payLoadData) < 4 {
continue
}
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index 52bb732..5400b04 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -127,7 +127,7 @@ func (h *RuleHandler) initStatisticData() {
if rule.startTime < atomic.LoadInt64(&h.dataLimitStartTime) {
atomic.StoreInt64(&h.dataLimitStartTime, rule.startTime)
}
- if rule.endTime < atomic.LoadInt64(&h.dataLimitEndTime) {
+ if rule.endTime > atomic.LoadInt64(&h.dataLimitEndTime) {
atomic.StoreInt64(&h.dataLimitEndTime, rule.endTime)
}
}
@@ -178,7 +178,7 @@ func (h *RuleHandler) GetCurDataLimit(lastDataDlt int64) *Result {
return nil
}
h.configMu.Lock()
- defer h.configMu.Lock()
+ defer h.configMu.Unlock()
if _, ok := h.flowCtrlRules[0]; !ok {
return nil
}
@@ -273,41 +273,41 @@ func parseFlowCtrlInfo(info string) (map[int32][]*Item, error) {
return flowCtrlMap, nil
}
-func parseRule(doc map[string]interface{}) ([]byte, error) {
+func parseRule(doc map[string]interface{}) ([]interface{}, error) {
if _, ok := doc["rule"]; !ok {
return nil, fmt.Errorf("rule field not existed")
}
- if _, ok := doc["rule"].([]byte); !ok {
- return nil, fmt.Errorf("rule should be a string")
+ if _, ok := doc["rule"].([]interface{}); !ok {
+ return nil, fmt.Errorf("rule should be interface")
}
- v := doc["rule"].([]byte)
+ v := doc["rule"].([]interface{})
return v, nil
}
-func parseDataLimit(b []byte) ([]*Item, error) {
- var rules []map[string]interface{}
- if err := json.Unmarshal(b, &rules); err != nil {
- return nil, err
- }
+func parseDataLimit(rules []interface{}) ([]*Item, error) {
items := make([]*Item, 0, len(rules))
for i, rule := range rules {
- start, err := parseTime(rule, "start")
+ if _, ok := rule.(map[string]interface{}); !ok {
+ return nil, fmt.Errorf("rule should be a map")
+ }
+ v := rule.(map[string]interface{})
+ start, err := parseTime(v, "start")
if err != nil {
return nil, err
}
- end, err := parseTime(rule, "end")
+ end, err := parseTime(v, "end")
if err != nil {
return nil, err
}
if start > end {
return nil, fmt.Errorf("start value must lower than the End value in index(%d) of data limit rule", i)
}
- datadlt, err := parseInt(rule, "dltInM", false, -1)
+ datadlt, err := parseInt(v, "dltInM", false, -1)
if err != nil {
return nil, fmt.Errorf("dltInM key is required in index(%d) of data limit rule", i)
}
datadlt = datadlt * 1024 * 204
- dataSizeLimit, err := parseInt(rule, "limitInM", false, -1)
+ dataSizeLimit, err := parseInt(v, "limitInM", false, -1)
if err != nil {
return nil, fmt.Errorf("limitInM key is required in index(%d) of data limit rule", i)
}
@@ -315,7 +315,7 @@ func parseDataLimit(b []byte) ([]*Item, error) {
return nil, fmt.Errorf("limitInM value must over than equal or bigger than zero in index(%d) of data limit rule", i)
}
dataSizeLimit = dataSizeLimit * 1024 * 1024
- freqMsLimit, err := parseInt(rule, "freqInMs", false, -1)
+ freqMsLimit, err := parseInt(v, "freqInMs", false, -1)
if err != nil {
return nil, fmt.Errorf("freqInMs key is required in index((%d) of data limit rule", i)
}
@@ -334,17 +334,17 @@ func parseDataLimit(b []byte) ([]*Item, error) {
return items, nil
}
-func parseLowFetchLimit(b []byte) ([]*Item, error) {
- var rules []map[string]interface{}
- if err := json.Unmarshal(b, &rules); err != nil {
- return nil, err
- }
+func parseLowFetchLimit(rules []interface{}) ([]*Item, error) {
items := make([]*Item, 0, len(rules))
for _, rule := range rules {
var filterFreqMs int64
var err error
- if _, ok := rule["filterFreqInMs"]; ok {
- filterFreqMs, err = parseInt(rule, "filterFreqInMs", false, -1)
+ if _, ok := rule.(map[string]interface{}); !ok {
+ return nil, fmt.Errorf("rule should be a map")
+ }
+ v := rule.(map[string]interface{})
+ if _, ok := v["filterFreqInMs"]; ok {
+ filterFreqMs, err = parseInt(v, "filterFreqInMs", false, -1)
if err != nil {
return nil, fmt.Errorf("decode failure: %s of filterFreqInMs field in parse low fetch limit", err.Error())
}
@@ -354,8 +354,8 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
}
var minFilteFreqMs int64
- if _, ok := rule["minDataFilterFreqInMs"]; ok {
- minFilteFreqMs, err = parseInt(rule, "minDataFilterFreqInMs", false, -1)
+ if _, ok := v["minDataFilterFreqInMs"]; ok {
+ minFilteFreqMs, err = parseInt(v, "minDataFilterFreqInMs", false, -1)
if err != nil {
return nil, fmt.Errorf("decode failure: %s of minDataFilterFreqInMs field in parse low fetch limit", err.Error())
}
@@ -367,8 +367,8 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
return nil, fmt.Errorf("decode failure: minDataFilterFreqInMs must lower than filterFreqInMs in index(%d) of low fetch limit rule", filterFreqMs)
}
var normFreqMs int64
- if _, ok := rule["normFreqInMs"]; ok {
- normFreqMs, err = parseInt(rule, "normFreqInMs", false, -1)
+ if _, ok := v["normFreqInMs"]; ok {
+ normFreqMs, err = parseInt(v, "normFreqInMs", false, -1)
if err != nil {
return nil, fmt.Errorf("decode failure: %s of normFreqInMs field in parse low fetch limit", err.Error())
}
@@ -386,18 +386,18 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
return items, nil
}
-func parseFreqLimit(b []byte) ([]*Item, error) {
- var rules []map[string]interface{}
- if err := json.Unmarshal(b, &rules); err != nil {
- return nil, err
- }
+func parseFreqLimit(rules []interface{}) ([]*Item, error) {
items := make([]*Item, 0, len(rules))
for _, rule := range rules {
- zeroCnt, err := parseInt(rule, "zeroCnt", false, util.InvalidValue)
+ if _, ok := rule.(map[string]interface{}); !ok {
+ return nil, fmt.Errorf("rule should be a map")
+ }
+ v := rule.(map[string]interface{})
+ zeroCnt, err := parseInt(v, "zeroCnt", false, util.InvalidValue)
if err != nil {
return nil, err
}
- freqLimit, err := parseInt(rule, "freqInMs", false, util.InvalidValue)
+ freqLimit, err := parseInt(v, "freqInMs", false, util.InvalidValue)
if err != nil {
return nil, err
}
@@ -419,14 +419,14 @@ func parseInt(doc map[string]interface{}, key string, compare bool, required int
if _, ok := doc[key]; !ok {
return util.InvalidValue, fmt.Errorf("field not existed")
}
- if _, ok := doc[key].(int64); !ok {
+ if _, ok := doc[key].(float64); !ok {
return util.InvalidValue, fmt.Errorf("illegal value, must be int type")
}
- v := doc[key].(int64)
- if compare && v != required {
+ v := doc[key].(float64)
+ if compare && int64(v) != required {
return util.InvalidValue, fmt.Errorf("illegal value, not required value content")
}
- return v, nil
+ return int64(v), nil
}
func parseTime(doc map[string]interface{}, key string) (int64, error) {