You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/07 07:37:13 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1529]Go SDK should reset heartbeat if register to master successfully (#1530)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 7ca091d  [INLONG-1529]Go SDK should reset heartbeat if register to master successfully (#1530)
7ca091d is described below

commit 7ca091df453d3d22bfbe13fc3be553d20ee296aa
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 7 15:37:07 2021 +0800

    [INLONG-1529]Go SDK should reset heartbeat if register to master successfully (#1530)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go             |  1 +
 tubemq-client-twins/tubemq-client-go/client/heartbeat.go | 16 +++++++++++++---
 tubemq-client-twins/tubemq-client-go/client/version.go   |  2 +-
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 537aa3a..3dd9111 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -494,6 +494,7 @@ func newClient(group string) string {
 		strconv.Itoa(os.Getpid()) + "-" +
 		strconv.Itoa(int(time.Now().Unix()*1000)) + "-" +
 		strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" +
+		"go-" +
 		tubeMQClientVersion
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index d74b320..281cd4c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -108,14 +108,20 @@ func (h *heartbeatManager) consumerHB2Master() {
 			if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
 				log.Warnf("[CONSUMER] hb2master found no-node or standby, re-register, client=%s", h.consumer.clientID)
 				address := h.consumer.master.Address
-				go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+				go func() {
+					err := h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+					if err != nil {
+						return
+					}
+					h.resetMasterHeartbeat()
+				}()
 				if rsp.GetErrCode() != errs.RetErrHBNoNode {
+					h.mu.Lock()
+					defer h.mu.Unlock()
 					hm := h.heartbeats[address]
 					hm.numConnections--
 					if hm.numConnections == 0 {
-						h.mu.Lock()
 						delete(h.heartbeats, address)
-						h.mu.Unlock()
 					}
 					return
 				}
@@ -126,6 +132,10 @@ func (h *heartbeatManager) consumerHB2Master() {
 	}
 	h.consumer.masterHBRetry = 0
 	h.processHBResponseM2C(rsp)
+	h.resetMasterHeartbeat()
+}
+
+func (h *heartbeatManager) resetMasterHeartbeat() {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	hm := h.heartbeats[h.consumer.master.Address]
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
index 44de222..fad542e 100644
--- a/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
 package client
 
 const (
-	tubeMQClientVersion = "go_0.1.0"
+	tubeMQClientVersion = "0.1.0"
 )