You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/17 11:28:42 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1563]Fix the
bug which can't stop the heartbeat timer and make close can be called once.
(#1564)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 21444df [INLONG-1563]Fix the bug which can't stop the heartbeat timer and make close can be called once. (#1564)
21444df is described below
commit 21444df7bb9085c2cd7b01c63dca56ff0fcc971f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Sep 17 19:28:34 2021 +0800
[INLONG-1563]Fix the bug which can't stop the heartbeat timer and make close can be called once. (#1564)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer.go | 2 +-
.../tubemq-client-go/client/consumer_impl.go | 44 +++++++++++++---------
.../tubemq-client-go/client/heartbeat.go | 2 +-
.../tubemq-client-go/example/consumer.go | 6 +--
.../example/multi_routine_consumer.go | 6 +--
5 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index b805c30..f6c8a51 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -42,7 +42,7 @@ type Consumer interface {
// GetCurrConsumedInfo returns the consumptions of the consumer.
GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
// Close closes the consumer client and release the resources.
- Close() error
+ Close()
// GetClientID returns the clientID of the consumer.
GetClientID() string
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index db56472..8183ce4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -25,6 +25,7 @@ import (
"os"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -68,6 +69,7 @@ type consumer struct {
heartbeatManager *heartbeatManager
unreportedTimes int
done chan struct{}
+ closeOnce sync.Once
}
// NewConsumer returns a consumer which is constructed by a given config.
@@ -353,18 +355,16 @@ func (c *consumer) GetClientID() string {
}
// Close implementation of TubeMQ consumer.
-func (c *consumer) Close() error {
- log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
- close(c.done)
- err := c.close2Master()
- if err != nil {
- return err
- }
- c.closeAllBrokers()
- c.heartbeatManager.close()
- c.client.Close()
- log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
- return nil
+func (c *consumer) Close() {
+ c.closeOnce.Do(func() {
+ log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
+ close(c.done)
+ c.heartbeatManager.close()
+ c.close2Master()
+ c.closeAllBrokers()
+ c.client.Close()
+ log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
+ })
}
func (c *consumer) processRebalanceEvent() {
@@ -436,7 +436,14 @@ func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
auth := c.genBrokerAuthenticInfo(true)
c.subInfo.SetAuthorizedInfo(auth)
- c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+ rsp, err := c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+ if err != nil {
+ log.Errorf("[CONSUMER] fail to unregister partition %s, error %s", partition, err.Error())
+ return
+ }
+ if !rsp.GetSuccess() {
+ log.Errorf("[CONSUMER] fail to unregister partition %s, err code: %d, error msg %s", partition, rsp.GetErrCode(), rsp.GetErrMsg())
+ }
}
func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
@@ -455,7 +462,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
continue
}
if !rsp.GetSuccess() {
- log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.ErrCode, rsp.ErrMsg)
+ log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.GetErrCode(), rsp.GetErrMsg())
return
}
@@ -698,7 +705,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
return msgSize, msgs
}
-func (c *consumer) close2Master() error {
+func (c *consumer) close2Master() {
log.Infof("[CONSUMER] close2Master begin, client=%s", c.clientID)
ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
defer cancel()
@@ -719,13 +726,14 @@ func (c *consumer) close2Master() error {
c.subInfo.SetMasterCertificateInfo(mci)
rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
if err != nil {
- return err
+ log.Errorf("[CONSUMER] fail to close master, error: %s", err.Error())
+ return
}
if !rsp.GetSuccess() {
- return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ log.Errorf("[CONSUMER] fail to close master, error code: %d, error msg: %s", rsp.GetErrCode(), rsp.GetErrMsg())
+ return
}
log.Infof("[CONSUMER] close2Master finished, client=%s", c.clientID)
- return nil
}
func (c *consumer) closeAllBrokers() {
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 281cd4c..7e9b814 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -268,7 +268,7 @@ func (h *heartbeatManager) close() {
if !heartbeat.timer.Stop() {
<-heartbeat.timer.C
}
- heartbeat.timer.Stop()
+ heartbeat.timer = nil
}
h.heartbeats = nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/example/consumer.go b/tubemq-client-twins/tubemq-client-go/example/consumer.go
index ca9ccad..eea4b84 100644
--- a/tubemq-client-twins/tubemq-client-go/example/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/example/consumer.go
@@ -63,9 +63,5 @@ func main() {
continue
}
}
- err = c.Close()
- if err != nil {
- log.Errorf("Close err %s", err.Error())
- panic(err)
- }
+ c.Close()
}
diff --git a/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
index dd257a5..63b7bb4 100644
--- a/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/example/multi_routine_consumer.go
@@ -80,11 +80,7 @@ func main() {
}(i)
}
wg.Wait()
- err = c.Close()
- if err != nil {
- log.Errorf("Close err %s", err.Error())
- panic(err)
- }
+ c.Close()
}
func reportMsg(cnt int64) {