You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:59 UTC

[incubator-inlong] 06/09: Address review comments

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2d64bb530f302ab97bbc2930197909b10e8152df
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jun 8 17:40:17 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            |   6 +-
 .../tubemq-client-go/client/consumer_impl.go       | 159 ++++++++++++---------
 .../tubemq-client-go/client/heartbeat.go           |  35 +++--
 .../tubemq-client-go/client/version.go             |   5 +
 .../tubemq-client-go/metadata/consumer_event.go    |   7 +
 .../tubemq-client-go/remote/remote.go              |  23 +--
 6 files changed, 130 insertions(+), 105 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a8536..a74dfa5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,10 +19,6 @@
 // which can be exposed to user.
 package client
 
-const (
-	tubeMQClientVersion = "0.1.0"
-)
-
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
 }
@@ -31,7 +27,7 @@ type ConsumerResult struct {
 type ConsumerOffset struct {
 }
 
-var clientIndex uint64
+var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
 type Consumer interface {
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index b7312bb..674f810 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -61,6 +61,7 @@ type consumer struct {
 	masterHBRetry    int
 	heartbeatManager *heartbeatManager
 	unreportedTimes  int
+	done             chan struct{}
 }
 
 // NewConsumer returns a consumer which is constructed by a given config.
@@ -70,7 +71,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 		return nil, err
 	}
 
-	clientID := newClientID(config.Consumer.Group)
+	clientID := newClient(config.Consumer.Group)
 	pool := multiplexing.NewPool()
 	opts := &transport.Options{}
 	if config.Net.TLS.Enable {
@@ -118,40 +119,18 @@ func (c *consumer) register2Master(needChange bool) error {
 		c.master = node
 	}
 	for c.master.HasNext {
-		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
-		m := &metadata.Metadata{}
-		node := &metadata.Node{}
-		node.SetHost(util.GetLocalHost())
-		node.SetAddress(c.master.Address)
-		m.SetNode(node)
-		sub := &metadata.SubscribeInfo{}
-		sub.SetGroup(c.config.Consumer.Group)
-		m.SetSubscribeInfo(sub)
-
-		auth := &protocol.AuthenticateInfo{}
-		c.genMasterAuthenticateToken(auth, true)
-		mci := &protocol.MasterCertificateInfo{
-			AuthInfo: auth,
-		}
-		c.subInfo.SetMasterCertificateInfo(mci)
-
-		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		rsp, err := c.sendRegRequest2Master()
 		if err != nil {
-			cancel()
 			return err
 		}
 		if rsp.GetSuccess() {
 			c.masterHBRetry = 0
 			c.processRegisterResponseM2C(rsp)
-			cancel()
 			return nil
 		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-			cancel()
 			return nil
 		} else {
 			c.master, err = c.selector.Select(c.config.Consumer.Masters)
-			cancel()
 			if err != nil {
 				return err
 			}
@@ -160,6 +139,30 @@ func (c *consumer) register2Master(needChange bool) error {
 	return nil
 }
 
+func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(c.master.Address)
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	m.SetSubscribeInfo(sub)
+
+	auth := &protocol.AuthenticateInfo{}
+	c.genMasterAuthenticateToken(auth, true)
+	mci := &protocol.MasterCertificateInfo{
+		AuthInfo: auth,
+	}
+	c.subInfo.SetMasterCertificateInfo(mci)
+
+	rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+	return rsp, err
+}
+
 func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
 	if rsp.GetNotAllocated() {
 		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -202,19 +205,25 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
 
 func (c *consumer) processRebalanceEvent() {
 	for {
-		event := c.rmtDataCache.TakeEvent()
-		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+		select {
+		case event, ok := <-c.rmtDataCache.EventCh:
+			if ok {
+				if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+					break
+				}
+				c.rmtDataCache.ClearEvent()
+				switch event.GetEventType() {
+				case metadata.Disconnect, metadata.OnlyDisconnect:
+					c.disconnect2Broker(event)
+					c.rmtDataCache.OfferEventResult(event)
+				case metadata.Connect, metadata.OnlyConnect:
+					c.connect2Broker(event)
+					c.rmtDataCache.OfferEventResult(event)
+				}
+			}
+		case <-c.done:
 			break
 		}
-		c.rmtDataCache.ClearEvent()
-		switch event.GetEventType() {
-		case 2, 20:
-			c.disconnect2Broker(event)
-			c.rmtDataCache.OfferEventResult(event)
-		case 1, 10:
-			c.connect2Broker(event)
-			c.rmtDataCache.OfferEventResult(event)
-		}
 	}
 }
 
@@ -237,48 +246,41 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad
 
 	for _, partitions := range unRegPartitions {
 		for _, partition := range partitions {
-			ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
-			m := &metadata.Metadata{}
-			node := &metadata.Node{}
-			node.SetHost(util.GetLocalHost())
-			node.SetAddress(partition.GetBroker().GetAddress())
-			m.SetNode(node)
-			m.SetReadStatus(1)
-			sub := &metadata.SubscribeInfo{}
-			sub.SetGroup(c.config.Consumer.Group)
-			sub.SetConsumerID(c.clientID)
-			sub.SetPartition(partition)
-			m.SetSubscribeInfo(sub)
-
-			c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
-			cancel()
+			c.sendUnregisterReq2Broker(partition)
 		}
 	}
 }
 
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(partition.GetBroker().GetAddress())
+	m.SetNode(node)
+	m.SetReadStatus(1)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	sub.SetConsumerID(c.clientID)
+	sub.SetPartition(partition)
+	m.SetSubscribeInfo(sub)
+
+	c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+}
+
 func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 	if len(event.GetSubscribeInfo()) > 0 {
 		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
 		if len(unsubPartitions) > 0 {
 			for _, partition := range unsubPartitions {
-				m := &metadata.Metadata{}
+
 				node := &metadata.Node{}
 				node.SetHost(util.GetLocalHost())
 				node.SetAddress(partition.GetBroker().GetAddress())
-				m.SetNode(node)
-				sub := &metadata.SubscribeInfo{}
-				sub.SetGroup(c.config.Consumer.Group)
-				sub.SetConsumerID(c.clientID)
-				sub.SetPartition(partition)
-				m.SetSubscribeInfo(sub)
-				isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
-				m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
-				auth := c.genBrokerAuthenticInfo(true)
-				c.subInfo.SetAuthorizedInfo(auth)
-
-				ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-				rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+
+				rsp, err := c.sendRegisterReq2Broker(partition, node)
 				if err != nil {
 					//todo add log
 				}
@@ -286,24 +288,39 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 					c.rmtDataCache.AddNewPartition(partition)
 					c.heartbeatManager.registerBroker(node)
 				}
-				cancel()
 			}
 		}
 	}
 	c.subInfo.FirstRegistered()
-	event.SetEventStatus(2)
+	event.SetEventStatus(metadata.Disconnect)
 }
 
-func newClientIndex() uint64 {
-	return atomic.AddUint64(&clientIndex, 1)
+func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	sub.SetConsumerID(c.clientID)
+	sub.SetPartition(partition)
+	m.SetSubscribeInfo(sub)
+	isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+	m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+	auth := c.genBrokerAuthenticInfo(true)
+	c.subInfo.SetAuthorizedInfo(auth)
+
+	rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+	return rsp, err
 }
 
-func newClientID(group string) string {
+func newClient(group string) string {
 	return group + "_" +
 		util.GetLocalHost() + "_" +
 		strconv.Itoa(os.Getpid()) + "_" +
 		strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
-		strconv.Itoa(int(newClientIndex())) + "_" +
+		strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "_" +
 		tubeMQClientVersion
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 9c29854..5ec0b5c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -95,17 +95,14 @@ func (h *heartbeatManager) consumerHB2Master() {
 
 	retry := 0
 	for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
-		ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
-		rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+		rsp, err := h.sendHeartbeatC2M(m)
 		if err != nil {
-			cancel()
+			continue
 		}
 		if rsp.GetSuccess() {
-			cancel()
 			h.processHBResponseM2C(rsp)
 			break
 		} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
-			cancel()
 			h.consumer.masterHBRetry++
 			address := h.consumer.master.Address
 			go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
@@ -120,7 +117,6 @@ func (h *heartbeatManager) consumerHB2Master() {
 			}
 			return
 		}
-		cancel()
 	}
 	h.mu.Lock()
 	defer h.mu.Unlock()
@@ -128,6 +124,13 @@ func (h *heartbeatManager) consumerHB2Master() {
 	hm.timer.Reset(h.nextHeartbeatInterval())
 }
 
+func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.HeartResponseM2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	return rsp, err
+}
+
 func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
 	h.consumer.masterHBRetry = 0
 	h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -158,7 +161,7 @@ func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C)
 			subscribeInfo = append(subscribeInfo, s)
 		}
 		e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
-		h.consumer.rmtDataCache.OfferEvent(e)
+		h.consumer.rmtDataCache.OfferEventAndNotify(e)
 	}
 }
 
@@ -179,16 +182,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 		h.resetBrokerTimer(broker)
 		return
 	}
-	m := &metadata.Metadata{}
-	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
-	m.SetNode(broker)
-	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
-	defer cancel()
 
-	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	rsp, err := h.sendHeartbeatC2B(broker)
 	if err != nil {
 		return
 	}
+
 	if rsp.GetSuccess() {
 		if rsp.GetHasPartFailure() {
 			partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
@@ -217,6 +216,16 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 	h.resetBrokerTimer(broker)
 }
 
+func (h *heartbeatManager) sendHeartbeatC2B(broker *metadata.Node) (*protocol.HeartBeatResponseB2C, error) {
+	m := &metadata.Metadata{}
+	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+	m.SetNode(broker)
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	return rsp, err
+}
+
 func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
 	interval := h.consumer.config.Heartbeat.Interval
 	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
new file mode 100644
index 0000000..1828fd7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -0,0 +1,5 @@
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"
+)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 6ce2915..08295c5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -17,6 +17,13 @@
 
 package metadata
 
+const (
+	Disconnect     = 2
+	OnlyDisconnect = 20
+	Connect        = 1
+	OnlyConnect    = 10
+)
+
 // ConsumerEvent represents the metadata of a consumer event
 type ConsumerEvent struct {
 	rebalanceID   int64
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 355e09b..bfc63b7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -39,7 +39,6 @@ type RmtDataCache struct {
 	eventReadMu        sync.Mutex
 	metaMu             sync.Mutex
 	dataBookMu         sync.Mutex
-	eventReadCond      *sync.Cond
 	brokerPartitions   map[*metadata.Node]map[string]bool
 	qryPriorityID      int32
 	partitions         map[string]*metadata.Partition
@@ -48,6 +47,8 @@ type RmtDataCache struct {
 	partitionTimeouts  map[string]*time.Timer
 	topicPartitions    map[string]map[string]bool
 	partitionRegBooked map[string]bool
+	// EventCh is the channel for consumer to consume
+	EventCh chan *metadata.ConsumerEvent
 }
 
 // NewRmtDataCache returns a default rmtDataCache.
@@ -65,8 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
 		partitionTimeouts:  make(map[string]*time.Timer),
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
+		EventCh:            make(chan *metadata.ConsumerEvent, 1),
 	}
-	r.eventReadCond = sync.NewCond(&r.eventReadMu)
 	return r
 }
 
@@ -149,24 +150,14 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event and notifies the consumer method.
-func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+// OfferEventAndNotify offers an consumer event and notifies the consumer method and notify the consumer to consume.
+func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
 	r.eventReadMu.Lock()
 	defer r.eventReadMu.Unlock()
 	r.rebalanceResults = append(r.rebalanceResults, event)
-	r.eventReadCond.Broadcast()
-}
-
-// TakeEvent takes an event from the rebalanceResults.
-func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-	r.eventReadMu.Lock()
-	defer r.eventReadMu.Unlock()
-	for len(r.rebalanceResults) == 0 {
-		r.eventReadCond.Wait()
-	}
-	event := r.rebalanceResults[0]
+	e := r.rebalanceResults[0]
 	r.rebalanceResults = r.rebalanceResults[1:]
-	return event
+	r.EventCh <- e
 }
 
 // ClearEvent clears all the events.