You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/22 08:53:33 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-780]Fix flow control bug (#575)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 8742079  [INLONG-780]Fix flow control bug (#575)
8742079 is described below

commit 87420790c25ffbe64b11a1020dbc5ef6e5594b51
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jul 22 16:53:28 2021 +0800

    [INLONG-780]Fix flow control bug (#575)
---
 .../tubemq-client-go/client/consumer_impl.go       | 28 ++++----
 .../tubemq-client-go/flowctrl/handler.go           | 76 +++++++++++-----------
 2 files changed, 52 insertions(+), 52 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 4a2255d..11e7620 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -48,6 +48,7 @@ const (
 	consumeStatusNormal        = 0
 	consumeStatusFromMax       = 1
 	consumeStatusFromMaxAlways = 2
+	msgFlagIncProperties       = 0x01
 )
 
 type consumer struct {
@@ -123,19 +124,18 @@ func (c *consumer) register2Master(needChange bool) error {
 	retryCount := 0
 	for {
 		rsp, err := c.sendRegRequest2Master()
-		if err != nil {
-			log.Infof("[CONSUMER]register2Master error %s", err.Error())
-			return err
-		}
-
-		log.Info("register2Master response %s", rsp.String())
-		if !rsp.GetSuccess() {
-			if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-				log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
+		if err != nil || !rsp.GetSuccess() {
+			if err != nil {
+				log.Errorf("[CONSUMER]register2Master error %s", err.Error())
+			} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+				log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.GetErrMsg())
 				return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 			}
 
 			if !c.master.HasNext {
+				if rsp != nil {
+					log.Errorf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.GetErrMsg())
+				}
 				break
 			}
 			retryCount++
@@ -145,6 +145,7 @@ func (c *consumer) register2Master(needChange bool) error {
 			}
 			continue
 		}
+		log.Info("register2Master response %s", rsp.String())
 
 		c.masterHBRetry = 0
 		c.processRegisterResponseM2C(rsp)
@@ -228,10 +229,9 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
 	rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
 	if err != nil {
 		log.Infof("[CONSUMER]GetMessage error %s", err.Error())
-		err1 := c.rmtDataCache.ReleasePartition(true, isFiltered, confirmContext, false)
-		if err1 != nil {
-			log.Error("[CONSUMER]GetMessage release partition error %s", err1.Error())
-			return nil, err1
+		if err := c.rmtDataCache.ReleasePartition(true, isFiltered, confirmContext, false); err != nil {
+			log.Errorf("[CONSUMER]GetMessage release partition error %s", err.Error())
+			return nil, err
 		}
 		return nil, err
 	}
@@ -633,7 +633,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
 		payLoadData := m.GetPayLoadData()
 		dataLen := len(payLoadData)
 		var properties map[string]string
-		if m.GetFlag()&0x01 == 1 {
+		if m.GetFlag()&msgFlagIncProperties == 1 {
 			if len(payLoadData) < 4 {
 				continue
 			}
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index 52bb732..5400b04 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -127,7 +127,7 @@ func (h *RuleHandler) initStatisticData() {
 				if rule.startTime < atomic.LoadInt64(&h.dataLimitStartTime) {
 					atomic.StoreInt64(&h.dataLimitStartTime, rule.startTime)
 				}
-				if rule.endTime < atomic.LoadInt64(&h.dataLimitEndTime) {
+				if rule.endTime > atomic.LoadInt64(&h.dataLimitEndTime) {
 					atomic.StoreInt64(&h.dataLimitEndTime, rule.endTime)
 				}
 			}
@@ -178,7 +178,7 @@ func (h *RuleHandler) GetCurDataLimit(lastDataDlt int64) *Result {
 		return nil
 	}
 	h.configMu.Lock()
-	defer h.configMu.Lock()
+	defer h.configMu.Unlock()
 	if _, ok := h.flowCtrlRules[0]; !ok {
 		return nil
 	}
@@ -273,41 +273,41 @@ func parseFlowCtrlInfo(info string) (map[int32][]*Item, error) {
 	return flowCtrlMap, nil
 }
 
-func parseRule(doc map[string]interface{}) ([]byte, error) {
+func parseRule(doc map[string]interface{}) ([]interface{}, error) {
 	if _, ok := doc["rule"]; !ok {
 		return nil, fmt.Errorf("rule field not existed")
 	}
-	if _, ok := doc["rule"].([]byte); !ok {
-		return nil, fmt.Errorf("rule should be a string")
+	if _, ok := doc["rule"].([]interface{}); !ok {
+		return nil, fmt.Errorf("rule should be interface")
 	}
-	v := doc["rule"].([]byte)
+	v := doc["rule"].([]interface{})
 	return v, nil
 }
 
-func parseDataLimit(b []byte) ([]*Item, error) {
-	var rules []map[string]interface{}
-	if err := json.Unmarshal(b, &rules); err != nil {
-		return nil, err
-	}
+func parseDataLimit(rules []interface{}) ([]*Item, error) {
 	items := make([]*Item, 0, len(rules))
 	for i, rule := range rules {
-		start, err := parseTime(rule, "start")
+		if _, ok := rule.(map[string]interface{}); !ok {
+			return nil, fmt.Errorf("rule should be a map")
+		}
+		v := rule.(map[string]interface{})
+		start, err := parseTime(v, "start")
 		if err != nil {
 			return nil, err
 		}
-		end, err := parseTime(rule, "end")
+		end, err := parseTime(v, "end")
 		if err != nil {
 			return nil, err
 		}
 		if start > end {
 			return nil, fmt.Errorf("start value must lower than the End value in index(%d) of data limit rule", i)
 		}
-		datadlt, err := parseInt(rule, "dltInM", false, -1)
+		datadlt, err := parseInt(v, "dltInM", false, -1)
 		if err != nil {
 			return nil, fmt.Errorf("dltInM key is required in index(%d) of data limit rule", i)
 		}
 		datadlt = datadlt * 1024 * 204
-		dataSizeLimit, err := parseInt(rule, "limitInM", false, -1)
+		dataSizeLimit, err := parseInt(v, "limitInM", false, -1)
 		if err != nil {
 			return nil, fmt.Errorf("limitInM key is required in index(%d) of data limit rule", i)
 		}
@@ -315,7 +315,7 @@ func parseDataLimit(b []byte) ([]*Item, error) {
 			return nil, fmt.Errorf("limitInM value must over than equal or bigger than zero in index(%d) of data limit rule", i)
 		}
 		dataSizeLimit = dataSizeLimit * 1024 * 1024
-		freqMsLimit, err := parseInt(rule, "freqInMs", false, -1)
+		freqMsLimit, err := parseInt(v, "freqInMs", false, -1)
 		if err != nil {
 			return nil, fmt.Errorf("freqInMs key is required in index((%d) of data limit rule", i)
 		}
@@ -334,17 +334,17 @@ func parseDataLimit(b []byte) ([]*Item, error) {
 	return items, nil
 }
 
-func parseLowFetchLimit(b []byte) ([]*Item, error) {
-	var rules []map[string]interface{}
-	if err := json.Unmarshal(b, &rules); err != nil {
-		return nil, err
-	}
+func parseLowFetchLimit(rules []interface{}) ([]*Item, error) {
 	items := make([]*Item, 0, len(rules))
 	for _, rule := range rules {
 		var filterFreqMs int64
 		var err error
-		if _, ok := rule["filterFreqInMs"]; ok {
-			filterFreqMs, err = parseInt(rule, "filterFreqInMs", false, -1)
+		if _, ok := rule.(map[string]interface{}); !ok {
+			return nil, fmt.Errorf("rule should be a map")
+		}
+		v := rule.(map[string]interface{})
+		if _, ok := v["filterFreqInMs"]; ok {
+			filterFreqMs, err = parseInt(v, "filterFreqInMs", false, -1)
 			if err != nil {
 				return nil, fmt.Errorf("decode failure: %s of filterFreqInMs field in parse low fetch limit", err.Error())
 			}
@@ -354,8 +354,8 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
 		}
 
 		var minFilteFreqMs int64
-		if _, ok := rule["minDataFilterFreqInMs"]; ok {
-			minFilteFreqMs, err = parseInt(rule, "minDataFilterFreqInMs", false, -1)
+		if _, ok := v["minDataFilterFreqInMs"]; ok {
+			minFilteFreqMs, err = parseInt(v, "minDataFilterFreqInMs", false, -1)
 			if err != nil {
 				return nil, fmt.Errorf("decode failure: %s of minDataFilterFreqInMs field in parse low fetch limit", err.Error())
 			}
@@ -367,8 +367,8 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
 			return nil, fmt.Errorf("decode failure: minDataFilterFreqInMs must lower than filterFreqInMs in index(%d) of low fetch limit rule", filterFreqMs)
 		}
 		var normFreqMs int64
-		if _, ok := rule["normFreqInMs"]; ok {
-			normFreqMs, err = parseInt(rule, "normFreqInMs", false, -1)
+		if _, ok := v["normFreqInMs"]; ok {
+			normFreqMs, err = parseInt(v, "normFreqInMs", false, -1)
 			if err != nil {
 				return nil, fmt.Errorf("decode failure: %s of normFreqInMs field in parse low fetch limit", err.Error())
 			}
@@ -386,18 +386,18 @@ func parseLowFetchLimit(b []byte) ([]*Item, error) {
 	return items, nil
 }
 
-func parseFreqLimit(b []byte) ([]*Item, error) {
-	var rules []map[string]interface{}
-	if err := json.Unmarshal(b, &rules); err != nil {
-		return nil, err
-	}
+func parseFreqLimit(rules []interface{}) ([]*Item, error) {
 	items := make([]*Item, 0, len(rules))
 	for _, rule := range rules {
-		zeroCnt, err := parseInt(rule, "zeroCnt", false, util.InvalidValue)
+		if _, ok := rule.(map[string]interface{}); !ok {
+			return nil, fmt.Errorf("rule should be a map")
+		}
+		v := rule.(map[string]interface{})
+		zeroCnt, err := parseInt(v, "zeroCnt", false, util.InvalidValue)
 		if err != nil {
 			return nil, err
 		}
-		freqLimit, err := parseInt(rule, "freqInMs", false, util.InvalidValue)
+		freqLimit, err := parseInt(v, "freqInMs", false, util.InvalidValue)
 		if err != nil {
 			return nil, err
 		}
@@ -419,14 +419,14 @@ func parseInt(doc map[string]interface{}, key string, compare bool, required int
 	if _, ok := doc[key]; !ok {
 		return util.InvalidValue, fmt.Errorf("field not existed")
 	}
-	if _, ok := doc[key].(int64); !ok {
+	if _, ok := doc[key].(float64); !ok {
 		return util.InvalidValue, fmt.Errorf("illegal value, must be int type")
 	}
-	v := doc[key].(int64)
-	if compare && v != required {
+	v := doc[key].(float64)
+	if compare && int64(v) != required {
 		return util.InvalidValue, fmt.Errorf("illegal value, not required value content")
 	}
-	return v, nil
+	return int64(v), nil
 }
 
 func parseTime(doc map[string]interface{}, key string) (int64, error) {