You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/22 12:48:46 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-784]Fix Go SDK Heartbeat Bug (#579)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new b17639c  [INLONG-784]Fix Go SDK Heartbeat Bug (#579)
b17639c is described below

commit b17639c3c4e05c96895b8a0357ce64a76a210417
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jul 22 20:48:37 2021 +0800

    [INLONG-784]Fix Go SDK Heartbeat Bug (#579)
---
 .../tubemq-client-go/client/consumer_impl.go         |  2 +-
 .../tubemq-client-go/client/heartbeat.go             | 20 +++++++++-----------
 2 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 11e7620..543868a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -659,7 +659,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
 								break
 							}
 						}
-						if found {
+						if !found {
 							continue
 						}
 					}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index fc39f12..50b140d 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -94,13 +94,11 @@ func (h *heartbeatManager) consumerHB2Master() {
 		h.consumer.unreportedTimes = 0
 	}
 
-	retry := 0
-	for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
-		rsp, err := h.sendHeartbeatC2M(m)
-		if err != nil {
-			continue
-		}
-
+	rsp, err := h.sendHeartbeatC2M(m)
+	if err != nil {
+		log.Errorf("consumer hb err %s", err.Error())
+		h.consumer.masterHBRetry++
+	} else {
 		if !rsp.GetSuccess() {
 			h.consumer.masterHBRetry++
 			if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
@@ -117,14 +115,13 @@ func (h *heartbeatManager) consumerHB2Master() {
 					}
 					return
 				}
-				log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.ErrMsg, h.consumer.clientID)
+				log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(), h.consumer.clientID)
 				return
 			}
 		}
-		h.consumer.masterHBRetry = 0
-		h.processHBResponseM2C(rsp)
-		break
 	}
+	h.consumer.masterHBRetry = 0
+	h.processHBResponseM2C(rsp)
 	h.mu.Lock()
 	defer h.mu.Unlock()
 	hm := h.heartbeats[h.consumer.master.Address]
@@ -193,6 +190,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 	rsp, err := h.sendHeartbeatC2B(broker)
 	if err != nil {
 		log.Warnf("[Heartbeat2Broker] request network to failure %s", err.Error())
+		h.resetBrokerTimer(broker)
 		return
 	}
 	partitionKeys := make([]string, 0, len(partitions))