You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/11 10:28:32 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1535]Go SDK should be closed before stopping the event processing goroutine (#1536)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new f78fc87  [INLONG-1535]Go SDK should be closed before stopping the event processing goroutine (#1536)
f78fc87 is described below

commit f78fc87e1bf9561069bc6e66a570578806e616db
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Sep 11 18:28:28 2021 +0800

    [INLONG-1535]Go SDK should be closed before stopping the event processing goroutine (#1536)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 10 +++++-----
 tubemq-client-twins/tubemq-client-go/flowctrl/handler.go     | 11 +++++------
 tubemq-client-twins/tubemq-client-go/remote/remote.go        |  2 +-
 3 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 3dd9111..8f48513 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -182,7 +182,7 @@ func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C)
 	if rsp.GetNotAllocated() {
 		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
 	}
-	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
 		if rsp.GetDefFlowCheckId() != 0 {
 			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
 		}
@@ -384,10 +384,10 @@ func (c *consumer) processRebalanceEvent() {
 			}
 		case <-c.done:
 			log.Infof("[CONSUMER]Rebalance done, client=%s", c.clientID)
-			break
+			log.Info("[CONSUMER] Rebalance event Handler stopped!")
+			return
 		}
 	}
-	log.Info("[CONSUMER] Rebalance event Handler stopped!")
 }
 
 func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
@@ -542,10 +542,10 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
 	if isFirstReg {
 		if c.config.Consumer.ConsumePosition == 0 {
 			readStatus = consumeStatusFromMax
-			log.Infof("[Consumer From Max Offset], client=", c.clientID)
+			log.Infof("[Consumer From Max Offset], client=%s", c.clientID)
 		} else if c.config.Consumer.ConsumePosition > 0 {
 			readStatus = consumeStatusFromMaxAlways
-			log.Infof("[Consumer From Max Offset Always], client=", c.clientID)
+			log.Infof("[Consumer From Max Offset Always], client=%s", c.clientID)
 		}
 	}
 	return int32(readStatus)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index 8f820b4..a454e60 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -79,10 +79,10 @@ func (h *RuleHandler) SetQryPriorityID(qryPriorityID int64) {
 
 // UpdateDefFlowCtrlInfo updates the flow control information.
 func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64, flowCtrlID int64, info string) error {
-	if atomic.LoadInt64(&h.flowCtrlID) == flowCtrlID {
+	curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
+	if curFlowCtrlID == flowCtrlID {
 		return nil
 	}
-	//curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
 	var flowCtrlItems map[int32][]*Item
 	var err error
 	if len(info) > 0 {
@@ -94,6 +94,7 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
 	h.configMu.Lock()
 	defer h.configMu.Unlock()
 	h.clearStatisticData()
+	atomic.StoreInt64(&h.flowCtrlID, flowCtrlID)
 	atomic.StoreInt64(&h.qrypriorityID, qrypriorityID)
 	if len(flowCtrlItems) == 0 {
 		h.flowCtrlRules = make(map[int32][]*Item)
@@ -105,11 +106,9 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
 	}
 	h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
 	if isDefault {
-		log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
-			flowCtrlID)
+		log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", curFlowCtrlID, flowCtrlID)
 	} else {
-		log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
-			flowCtrlID)
+		log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", curFlowCtrlID, flowCtrlID)
 	}
 	return nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5b26b0e..01d96ca 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -167,7 +167,7 @@ func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo stri
 
 // UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
 func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) {
-	if flowCtrlID != r.defHandler.GetFlowCtrID() {
+	if flowCtrlID != r.groupHandler.GetFlowCtrID() {
 		r.groupHandler.UpdateDefFlowCtrlInfo(false, int64(qryPriorityID), flowCtrlID, flowCtrlInfo)
 	}
 	if int64(qryPriorityID) != r.groupHandler.GetQryPriorityID() {