You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/22 12:48:46 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-784]Fix Go SDK
Heartbeat Bug (#579)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new b17639c [INLONG-784]Fix Go SDK Heartbeat Bug (#579)
b17639c is described below
commit b17639c3c4e05c96895b8a0357ce64a76a210417
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jul 22 20:48:37 2021 +0800
[INLONG-784]Fix Go SDK Heartbeat Bug (#579)
---
.../tubemq-client-go/client/consumer_impl.go | 2 +-
.../tubemq-client-go/client/heartbeat.go | 20 +++++++++-----------
2 files changed, 10 insertions(+), 12 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 11e7620..543868a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -659,7 +659,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
break
}
}
- if found {
+ if !found {
continue
}
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index fc39f12..50b140d 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -94,13 +94,11 @@ func (h *heartbeatManager) consumerHB2Master() {
h.consumer.unreportedTimes = 0
}
- retry := 0
- for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
- rsp, err := h.sendHeartbeatC2M(m)
- if err != nil {
- continue
- }
-
+ rsp, err := h.sendHeartbeatC2M(m)
+ if err != nil {
+ log.Errorf("consumer hb err %s", err.Error())
+ h.consumer.masterHBRetry++
+ } else {
if !rsp.GetSuccess() {
h.consumer.masterHBRetry++
if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
@@ -117,14 +115,13 @@ func (h *heartbeatManager) consumerHB2Master() {
}
return
}
- log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.ErrMsg, h.consumer.clientID)
+ log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(), h.consumer.clientID)
return
}
}
- h.consumer.masterHBRetry = 0
- h.processHBResponseM2C(rsp)
- break
}
+ h.consumer.masterHBRetry = 0
+ h.processHBResponseM2C(rsp)
h.mu.Lock()
defer h.mu.Unlock()
hm := h.heartbeats[h.consumer.master.Address]
@@ -193,6 +190,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
rsp, err := h.sendHeartbeatC2B(broker)
if err != nil {
log.Warnf("[Heartbeat2Broker] request network to failure %s", err.Error())
+ h.resetBrokerTimer(broker)
return
}
partitionKeys := make([]string, 0, len(partitions))