You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/07 08:52:24 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1531]Go SDK
should init the flow control item of the partition (#1532)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new eb9bd8d [INLONG-1531]Go SDK should init the flow control item of the partition (#1532)
eb9bd8d is described below
commit eb9bd8da85b88d61d92f356ec972b9fa8bf9544d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 7 16:52:21 2021 +0800
[INLONG-1531]Go SDK should init the flow control item of the partition (#1532)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/flowctrl/handler.go | 8 ++++----
tubemq-client-twins/tubemq-client-go/flowctrl/item.go | 9 ++++++++-
tubemq-client-twins/tubemq-client-go/metadata/partition.go | 3 +++
3 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index eb05482..8f820b4 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -148,7 +148,7 @@ func (h *RuleHandler) initStatisticData() {
if rule.tp != 3 {
continue
}
- h.flowCtrlItem.SetTp(3)
+ h.flowCtrlItem.SetTp(RequestFrequencyControl)
h.flowCtrlItem.SetDataSizeLimit(rule.dataSizeLimit)
h.flowCtrlItem.SetFreqLimit(rule.freqMsLimit)
h.flowCtrlItem.SetZeroCnt(rule.zeroCnt)
@@ -324,7 +324,7 @@ func parseDataLimit(rules []interface{}) ([]*Item, error) {
return nil, fmt.Errorf("freqInMs value must over than equal or bigger than 200 in index(%d) of data limit rule", i)
}
item := NewItem()
- item.SetTp(0)
+ item.SetTp(CurrentLimit)
item.SetStartTime(start)
item.SetEndTime(end)
item.SetDatadlt(datadlt)
@@ -378,7 +378,7 @@ func parseLowFetchLimit(rules []interface{}) ([]*Item, error) {
}
}
item := NewItem()
- item.SetTp(3)
+ item.SetTp(RequestFrequencyControl)
item.SetDataSizeLimit(normFreqMs)
item.SetFreqLimit(filterFreqMs)
item.SetZeroCnt(minFilteFreqMs)
@@ -403,7 +403,7 @@ func parseFreqLimit(rules []interface{}) ([]*Item, error) {
return nil, err
}
item := NewItem()
- item.SetTp(1)
+ item.SetTp(FrequencyLimit)
item.SetZeroCnt(zeroCnt)
item.SetFreqLimit(freqLimit)
items = append(items, item)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
index 9983321..63c9162 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
@@ -21,6 +21,13 @@ import (
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
)
+const (
+ CurrentLimit = iota
+ FrequencyLimit
+ SSDTransfer
+ RequestFrequencyControl
+)
+
// Item defines a flow control item.
type Item struct {
tp int32
@@ -59,7 +66,7 @@ func (i *Item) SetEndTime(endTime int64) {
i.endTime = endTime
}
-// SetDataDlt sets the datadlt.
+// SetDatadlt sets the datadlt.
func (i *Item) SetDatadlt(datadlt int64) {
i.datadlt = datadlt
}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 8314502..c256f70 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -94,6 +94,8 @@ func NewPartition(partition string) (*Partition, error) {
return nil, err
}
partitionKey := strconv.Itoa(int(b.id)) + ":" + topic + ":" + strconv.Itoa(partitionID)
+ item := flowctrl.NewItem()
+ item.SetTp(flowctrl.RequestFrequencyControl)
return &Partition{
topic: topic,
broker: b,
@@ -103,6 +105,7 @@ func NewPartition(partition string) (*Partition, error) {
consumeData: &ConsumeData{
curDataDlt: util.InvalidValue,
},
+ freqCtrl: item,
}, nil
}