You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/07 07:37:13 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1529]Go SDK
should reset heartbeat if register to master successfully (#1530)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 7ca091d [INLONG-1529]Go SDK should reset heartbeat if register to master successfully (#1530)
7ca091d is described below
commit 7ca091df453d3d22bfbe13fc3be553d20ee296aa
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Sep 7 15:37:07 2021 +0800
[INLONG-1529]Go SDK should reset heartbeat if register to master successfully (#1530)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer_impl.go | 1 +
tubemq-client-twins/tubemq-client-go/client/heartbeat.go | 16 +++++++++++++---
tubemq-client-twins/tubemq-client-go/client/version.go | 2 +-
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 537aa3a..3dd9111 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -494,6 +494,7 @@ func newClient(group string) string {
strconv.Itoa(os.Getpid()) + "-" +
strconv.Itoa(int(time.Now().Unix()*1000)) + "-" +
strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" +
+ "go-" +
tubeMQClientVersion
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index d74b320..281cd4c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -108,14 +108,20 @@ func (h *heartbeatManager) consumerHB2Master() {
if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
log.Warnf("[CONSUMER] hb2master found no-node or standby, re-register, client=%s", h.consumer.clientID)
address := h.consumer.master.Address
- go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+ go func() {
+ err := h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+ if err != nil {
+ return
+ }
+ h.resetMasterHeartbeat()
+ }()
if rsp.GetErrCode() != errs.RetErrHBNoNode {
+ h.mu.Lock()
+ defer h.mu.Unlock()
hm := h.heartbeats[address]
hm.numConnections--
if hm.numConnections == 0 {
- h.mu.Lock()
delete(h.heartbeats, address)
- h.mu.Unlock()
}
return
}
@@ -126,6 +132,10 @@ func (h *heartbeatManager) consumerHB2Master() {
}
h.consumer.masterHBRetry = 0
h.processHBResponseM2C(rsp)
+ h.resetMasterHeartbeat()
+}
+
+func (h *heartbeatManager) resetMasterHeartbeat() {
h.mu.Lock()
defer h.mu.Unlock()
hm := h.heartbeats[h.consumer.master.Address]
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
index 44de222..fad542e 100644
--- a/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
package client
const (
- tubeMQClientVersion = "go_0.1.0"
+ tubeMQClientVersion = "0.1.0"
)