You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/07 08:52:24 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1531]Go SDK should init the flow control item of the partition (#1532)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new eb9bd8d  [INLONG-1531]Go SDK should init the flow control item of the partition (#1532)
eb9bd8d is described below

commit eb9bd8da85b88d61d92f356ec972b9fa8bf9544d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 7 16:52:21 2021 +0800

    [INLONG-1531]Go SDK should init the flow control item of the partition (#1532)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/flowctrl/handler.go   | 8 ++++----
 tubemq-client-twins/tubemq-client-go/flowctrl/item.go      | 9 ++++++++-
 tubemq-client-twins/tubemq-client-go/metadata/partition.go | 3 +++
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index eb05482..8f820b4 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -148,7 +148,7 @@ func (h *RuleHandler) initStatisticData() {
 				if rule.tp != 3 {
 					continue
 				}
-				h.flowCtrlItem.SetTp(3)
+				h.flowCtrlItem.SetTp(RequestFrequencyControl)
 				h.flowCtrlItem.SetDataSizeLimit(rule.dataSizeLimit)
 				h.flowCtrlItem.SetFreqLimit(rule.freqMsLimit)
 				h.flowCtrlItem.SetZeroCnt(rule.zeroCnt)
@@ -324,7 +324,7 @@ func parseDataLimit(rules []interface{}) ([]*Item, error) {
 			return nil, fmt.Errorf("freqInMs value must over than equal or bigger than 200 in index(%d) of data limit rule", i)
 		}
 		item := NewItem()
-		item.SetTp(0)
+		item.SetTp(CurrentLimit)
 		item.SetStartTime(start)
 		item.SetEndTime(end)
 		item.SetDatadlt(datadlt)
@@ -378,7 +378,7 @@ func parseLowFetchLimit(rules []interface{}) ([]*Item, error) {
 			}
 		}
 		item := NewItem()
-		item.SetTp(3)
+		item.SetTp(RequestFrequencyControl)
 		item.SetDataSizeLimit(normFreqMs)
 		item.SetFreqLimit(filterFreqMs)
 		item.SetZeroCnt(minFilteFreqMs)
@@ -403,7 +403,7 @@ func parseFreqLimit(rules []interface{}) ([]*Item, error) {
 			return nil, err
 		}
 		item := NewItem()
-		item.SetTp(1)
+		item.SetTp(FrequencyLimit)
 		item.SetZeroCnt(zeroCnt)
 		item.SetFreqLimit(freqLimit)
 		items = append(items, item)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
index 9983321..63c9162 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
@@ -21,6 +21,13 @@ import (
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
+const (
+	CurrentLimit = iota
+	FrequencyLimit
+	SSDTransfer
+	RequestFrequencyControl
+)
+
 // Item defines a flow control item.
 type Item struct {
 	tp            int32
@@ -59,7 +66,7 @@ func (i *Item) SetEndTime(endTime int64) {
 	i.endTime = endTime
 }
 
-// SetDataDlt sets the datadlt.
+// SetDatadlt sets the datadlt.
 func (i *Item) SetDatadlt(datadlt int64) {
 	i.datadlt = datadlt
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 8314502..c256f70 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -94,6 +94,8 @@ func NewPartition(partition string) (*Partition, error) {
 		return nil, err
 	}
 	partitionKey := strconv.Itoa(int(b.id)) + ":" + topic + ":" + strconv.Itoa(partitionID)
+	item := flowctrl.NewItem()
+	item.SetTp(flowctrl.RequestFrequencyControl)
 	return &Partition{
 		topic:        topic,
 		broker:       b,
@@ -103,6 +105,7 @@ func NewPartition(partition string) (*Partition, error) {
 		consumeData: &ConsumeData{
 			curDataDlt: util.InvalidValue,
 		},
+		freqCtrl: item,
 	}, nil
 }