You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/11 10:28:32 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1535]Go SDK
should be closed before stopping the event processing goroutine (#1536)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new f78fc87 [INLONG-1535]Go SDK should be closed before stopping the event processing goroutine (#1536)
f78fc87 is described below
commit f78fc87e1bf9561069bc6e66a570578806e616db
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Sep 11 18:28:28 2021 +0800
[INLONG-1535]Go SDK should be closed before stopping the event processing goroutine (#1536)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 10 +++++-----
tubemq-client-twins/tubemq-client-go/flowctrl/handler.go | 11 +++++------
tubemq-client-twins/tubemq-client-go/remote/remote.go | 2 +-
3 files changed, 11 insertions(+), 12 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 3dd9111..8f48513 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -182,7 +182,7 @@ func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C)
if rsp.GetNotAllocated() {
c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
}
- if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
if rsp.GetDefFlowCheckId() != 0 {
c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
}
@@ -384,10 +384,10 @@ func (c *consumer) processRebalanceEvent() {
}
case <-c.done:
log.Infof("[CONSUMER]Rebalance done, client=%s", c.clientID)
- break
+ log.Info("[CONSUMER] Rebalance event Handler stopped!")
+ return
}
}
- log.Info("[CONSUMER] Rebalance event Handler stopped!")
}
func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
@@ -542,10 +542,10 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
if isFirstReg {
if c.config.Consumer.ConsumePosition == 0 {
readStatus = consumeStatusFromMax
- log.Infof("[Consumer From Max Offset], client=", c.clientID)
+ log.Infof("[Consumer From Max Offset], client=%s", c.clientID)
} else if c.config.Consumer.ConsumePosition > 0 {
readStatus = consumeStatusFromMaxAlways
- log.Infof("[Consumer From Max Offset Always], client=", c.clientID)
+ log.Infof("[Consumer From Max Offset Always], client=%s", c.clientID)
}
}
return int32(readStatus)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index 8f820b4..a454e60 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -79,10 +79,10 @@ func (h *RuleHandler) SetQryPriorityID(qryPriorityID int64) {
// UpdateDefFlowCtrlInfo updates the flow control information.
func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64, flowCtrlID int64, info string) error {
- if atomic.LoadInt64(&h.flowCtrlID) == flowCtrlID {
+ curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
+ if curFlowCtrlID == flowCtrlID {
return nil
}
- //curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
var flowCtrlItems map[int32][]*Item
var err error
if len(info) > 0 {
@@ -94,6 +94,7 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
h.configMu.Lock()
defer h.configMu.Unlock()
h.clearStatisticData()
+ atomic.StoreInt64(&h.flowCtrlID, flowCtrlID)
atomic.StoreInt64(&h.qrypriorityID, qrypriorityID)
if len(flowCtrlItems) == 0 {
h.flowCtrlRules = make(map[int32][]*Item)
@@ -105,11 +106,9 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
}
h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
if isDefault {
- log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
- flowCtrlID)
+ log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", curFlowCtrlID, flowCtrlID)
} else {
- log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
- flowCtrlID)
+ log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", curFlowCtrlID, flowCtrlID)
}
return nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5b26b0e..01d96ca 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -167,7 +167,7 @@ func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo stri
// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) {
- if flowCtrlID != r.defHandler.GetFlowCtrID() {
+ if flowCtrlID != r.groupHandler.GetFlowCtrID() {
r.groupHandler.UpdateDefFlowCtrlInfo(false, int64(qryPriorityID), flowCtrlID, flowCtrlInfo)
}
if int64(qryPriorityID) != r.groupHandler.GetQryPriorityID() {