You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/17 11:28:42 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1563]Fix the bug which can't stop the heartbeat timer and make close can be called once. (#1564)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 21444df  [INLONG-1563]Fix the bug which can't stop the heartbeat timer and make close can be called once. (#1564)
21444df is described below

commit 21444df7bb9085c2cd7b01c63dca56ff0fcc971f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Sep 17 19:28:34 2021 +0800

    [INLONG-1563]Fix the bug which can't stop the heartbeat timer and make close can be called once. (#1564)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            |  2 +-
 .../tubemq-client-go/client/consumer_impl.go       | 44 +++++++++++++---------
 .../tubemq-client-go/client/heartbeat.go           |  2 +-
 .../tubemq-client-go/example/consumer.go           |  6 +--
 .../example/multi_routine_consumer.go              |  6 +--
 5 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index b805c30..f6c8a51 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -42,7 +42,7 @@ type Consumer interface {
 	// GetCurrConsumedInfo returns the consumptions of the consumer.
 	GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
 	// Close closes the consumer client and release the resources.
-	Close() error
+	Close()
 	// GetClientID returns the clientID of the consumer.
 	GetClientID() string
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index db56472..8183ce4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -25,6 +25,7 @@ import (
 	"os"
 	"strconv"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -68,6 +69,7 @@ type consumer struct {
 	heartbeatManager *heartbeatManager
 	unreportedTimes  int
 	done             chan struct{}
+	closeOnce        sync.Once
 }
 
 // NewConsumer returns a consumer which is constructed by a given config.
@@ -353,18 +355,16 @@ func (c *consumer) GetClientID() string {
 }
 
 // Close implementation of TubeMQ consumer.
-func (c *consumer) Close() error {
-	log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
-	close(c.done)
-	err := c.close2Master()
-	if err != nil {
-		return err
-	}
-	c.closeAllBrokers()
-	c.heartbeatManager.close()
-	c.client.Close()
-	log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
-	return nil
+func (c *consumer) Close() {
+	c.closeOnce.Do(func() {
+		log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
+		close(c.done)
+		c.heartbeatManager.close()
+		c.close2Master()
+		c.closeAllBrokers()
+		c.client.Close()
+		log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
+	})
 }
 
 func (c *consumer) processRebalanceEvent() {
@@ -436,7 +436,14 @@ func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
 	auth := c.genBrokerAuthenticInfo(true)
 	c.subInfo.SetAuthorizedInfo(auth)
 
-	c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+	rsp, err := c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+	if err != nil {
+		log.Errorf("[CONSUMER] fail to unregister partition %s, error %s", partition, err.Error())
+		return
+	}
+	if !rsp.GetSuccess() {
+		log.Errorf("[CONSUMER] fail to unregister partition %s, err code: %d, error msg %s", partition, rsp.GetErrCode(), rsp.GetErrMsg())
+	}
 }
 
 func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
@@ -455,7 +462,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 					continue
 				}
 				if !rsp.GetSuccess() {
-					log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.ErrCode, rsp.ErrMsg)
+					log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.GetErrCode(), rsp.GetErrMsg())
 					return
 				}
 
@@ -698,7 +705,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
 	return msgSize, msgs
 }
 
-func (c *consumer) close2Master() error {
+func (c *consumer) close2Master() {
 	log.Infof("[CONSUMER] close2Master begin, client=%s", c.clientID)
 	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
 	defer cancel()
@@ -719,13 +726,14 @@ func (c *consumer) close2Master() error {
 	c.subInfo.SetMasterCertificateInfo(mci)
 	rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
 	if err != nil {
-		return err
+		log.Errorf("[CONSUMER] fail to close master, error: %s", err.Error())
+		return
 	}
 	if !rsp.GetSuccess() {
-		return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+		log.Errorf("[CONSUMER] fail to close master, error code: %d, error msg: %s", rsp.GetErrCode(), rsp.GetErrMsg())
+		return
 	}
 	log.Infof("[CONSUMER] close2Master finished, client=%s", c.clientID)
-	return nil
 }
 
 func (c *consumer) closeAllBrokers() {
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 281cd4c..7e9b814 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -268,7 +268,7 @@ func (h *heartbeatManager) close() {
 		if !heartbeat.timer.Stop() {
 			<-heartbeat.timer.C
 		}
-		heartbeat.timer.Stop()
+		heartbeat.timer = nil
 	}
 	h.heartbeats = nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/example/consumer.go b/tubemq-client-twins/tubemq-client-go/example/consumer.go
index ca9ccad..eea4b84 100644
--- a/tubemq-client-twins/tubemq-client-go/example/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/example/consumer.go
@@ -63,9 +63,5 @@ func main() {
 			continue
 		}
 	}
-	err = c.Close()
-	if err != nil {
-		log.Errorf("Close err %s", err.Error())
-		panic(err)
-	}
+	c.Close()
 }
diff --git a/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
index dd257a5..63b7bb4 100644
--- a/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
@@ -80,11 +80,7 @@ func main() {
 		}(i)
 	}
 	wg.Wait()
-	err = c.Close()
-	if err != nil {
-		log.Errorf("Close err %s", err.Error())
-		panic(err)
-	}
+	c.Close()
 }
 
 func reportMsg(cnt int64) {