You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:59 UTC
[incubator-inlong] 06/09: Address review comments
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 2d64bb530f302ab97bbc2930197909b10e8152df
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jun 8 17:40:17 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer.go | 6 +-
.../tubemq-client-go/client/consumer_impl.go | 159 ++++++++++++---------
.../tubemq-client-go/client/heartbeat.go | 35 +++--
.../tubemq-client-go/client/version.go | 5 +
.../tubemq-client-go/metadata/consumer_event.go | 7 +
.../tubemq-client-go/remote/remote.go | 23 +--
6 files changed, 130 insertions(+), 105 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a8536..a74dfa5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,10 +19,6 @@
// which can be exposed to user.
package client
-const (
- tubeMQClientVersion = "0.1.0"
-)
-
// ConsumerResult of a consumption.
type ConsumerResult struct {
}
@@ -31,7 +27,7 @@ type ConsumerResult struct {
type ConsumerOffset struct {
}
-var clientIndex uint64
+var clientID uint64
// Consumer is an interface that abstracts behavior of TubeMQ's consumer
type Consumer interface {
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index b7312bb..674f810 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -61,6 +61,7 @@ type consumer struct {
masterHBRetry int
heartbeatManager *heartbeatManager
unreportedTimes int
+ done chan struct{}
}
// NewConsumer returns a consumer which is constructed by a given config.
@@ -70,7 +71,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
return nil, err
}
- clientID := newClientID(config.Consumer.Group)
+ clientID := newClient(config.Consumer.Group)
pool := multiplexing.NewPool()
opts := &transport.Options{}
if config.Net.TLS.Enable {
@@ -118,40 +119,18 @@ func (c *consumer) register2Master(needChange bool) error {
c.master = node
}
for c.master.HasNext {
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
- m := &metadata.Metadata{}
- node := &metadata.Node{}
- node.SetHost(util.GetLocalHost())
- node.SetAddress(c.master.Address)
- m.SetNode(node)
- sub := &metadata.SubscribeInfo{}
- sub.SetGroup(c.config.Consumer.Group)
- m.SetSubscribeInfo(sub)
-
- auth := &protocol.AuthenticateInfo{}
- c.genMasterAuthenticateToken(auth, true)
- mci := &protocol.MasterCertificateInfo{
- AuthInfo: auth,
- }
- c.subInfo.SetMasterCertificateInfo(mci)
-
- rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ rsp, err := c.sendRegRequest2Master()
if err != nil {
- cancel()
return err
}
if rsp.GetSuccess() {
c.masterHBRetry = 0
c.processRegisterResponseM2C(rsp)
- cancel()
return nil
} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
- cancel()
return nil
} else {
c.master, err = c.selector.Select(c.config.Consumer.Masters)
- cancel()
if err != nil {
return err
}
@@ -160,6 +139,30 @@ func (c *consumer) register2Master(needChange bool) error {
return nil
}
+func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ return rsp, err
+}
+
func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
if rsp.GetNotAllocated() {
c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -202,19 +205,25 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
func (c *consumer) processRebalanceEvent() {
for {
- event := c.rmtDataCache.TakeEvent()
- if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+ select {
+ case event, ok := <-c.rmtDataCache.EventCh:
+ if ok {
+ if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+ break
+ }
+ c.rmtDataCache.ClearEvent()
+ switch event.GetEventType() {
+ case metadata.Disconnect, metadata.OnlyDisconnect:
+ c.disconnect2Broker(event)
+ c.rmtDataCache.OfferEventResult(event)
+ case metadata.Connect, metadata.OnlyConnect:
+ c.connect2Broker(event)
+ c.rmtDataCache.OfferEventResult(event)
+ }
+ }
+ case <-c.done:
break
}
- c.rmtDataCache.ClearEvent()
- switch event.GetEventType() {
- case 2, 20:
- c.disconnect2Broker(event)
- c.rmtDataCache.OfferEventResult(event)
- case 1, 10:
- c.connect2Broker(event)
- c.rmtDataCache.OfferEventResult(event)
- }
}
}
@@ -237,48 +246,41 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad
for _, partitions := range unRegPartitions {
for _, partition := range partitions {
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
- m := &metadata.Metadata{}
- node := &metadata.Node{}
- node.SetHost(util.GetLocalHost())
- node.SetAddress(partition.GetBroker().GetAddress())
- m.SetNode(node)
- m.SetReadStatus(1)
- sub := &metadata.SubscribeInfo{}
- sub.SetGroup(c.config.Consumer.Group)
- sub.SetConsumerID(c.clientID)
- sub.SetPartition(partition)
- m.SetSubscribeInfo(sub)
-
- c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
- cancel()
+ c.sendUnregisterReq2Broker(partition)
}
}
}
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(partition.GetBroker().GetAddress())
+ m.SetNode(node)
+ m.SetReadStatus(1)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetConsumerID(c.clientID)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+
+ c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+}
+
func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
if len(event.GetSubscribeInfo()) > 0 {
unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
if len(unsubPartitions) > 0 {
for _, partition := range unsubPartitions {
- m := &metadata.Metadata{}
+
node := &metadata.Node{}
node.SetHost(util.GetLocalHost())
node.SetAddress(partition.GetBroker().GetAddress())
- m.SetNode(node)
- sub := &metadata.SubscribeInfo{}
- sub.SetGroup(c.config.Consumer.Group)
- sub.SetConsumerID(c.clientID)
- sub.SetPartition(partition)
- m.SetSubscribeInfo(sub)
- isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
- m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
- auth := c.genBrokerAuthenticInfo(true)
- c.subInfo.SetAuthorizedInfo(auth)
-
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+
+ rsp, err := c.sendRegisterReq2Broker(partition, node)
if err != nil {
//todo add log
}
@@ -286,24 +288,39 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
c.rmtDataCache.AddNewPartition(partition)
c.heartbeatManager.registerBroker(node)
}
- cancel()
}
}
}
c.subInfo.FirstRegistered()
- event.SetEventStatus(2)
+ event.SetEventStatus(metadata.Disconnect)
}
-func newClientIndex() uint64 {
- return atomic.AddUint64(&clientIndex, 1)
+func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ defer cancel()
+
+ m := &metadata.Metadata{}
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetConsumerID(c.clientID)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+ isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+ m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+ auth := c.genBrokerAuthenticInfo(true)
+ c.subInfo.SetAuthorizedInfo(auth)
+
+ rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+ return rsp, err
}
-func newClientID(group string) string {
+func newClient(group string) string {
return group + "_" +
util.GetLocalHost() + "_" +
strconv.Itoa(os.Getpid()) + "_" +
strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
- strconv.Itoa(int(newClientIndex())) + "_" +
+ strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "_" +
tubeMQClientVersion
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 9c29854..5ec0b5c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -95,17 +95,14 @@ func (h *heartbeatManager) consumerHB2Master() {
retry := 0
for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
- ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
- rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ rsp, err := h.sendHeartbeatC2M(m)
if err != nil {
- cancel()
+ continue
}
if rsp.GetSuccess() {
- cancel()
h.processHBResponseM2C(rsp)
break
} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
- cancel()
h.consumer.masterHBRetry++
address := h.consumer.master.Address
go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
@@ -120,7 +117,6 @@ func (h *heartbeatManager) consumerHB2Master() {
}
return
}
- cancel()
}
h.mu.Lock()
defer h.mu.Unlock()
@@ -128,6 +124,13 @@ func (h *heartbeatManager) consumerHB2Master() {
hm.timer.Reset(h.nextHeartbeatInterval())
}
+func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.HeartResponseM2C, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ return rsp, err
+}
+
func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
h.consumer.masterHBRetry = 0
h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -158,7 +161,7 @@ func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C)
subscribeInfo = append(subscribeInfo, s)
}
e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
- h.consumer.rmtDataCache.OfferEvent(e)
+ h.consumer.rmtDataCache.OfferEventAndNotify(e)
}
}
@@ -179,16 +182,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
h.resetBrokerTimer(broker)
return
}
- m := &metadata.Metadata{}
- m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
- m.SetNode(broker)
- ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ rsp, err := h.sendHeartbeatC2B(broker)
if err != nil {
return
}
+
if rsp.GetSuccess() {
if rsp.GetHasPartFailure() {
partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
@@ -217,6 +216,16 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
h.resetBrokerTimer(broker)
}
+func (h *heartbeatManager) sendHeartbeatC2B(broker *metadata.Node) (*protocol.HeartBeatResponseB2C, error) {
+ m := &metadata.Metadata{}
+ m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+ m.SetNode(broker)
+ ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ return rsp, err
+}
+
func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
interval := h.consumer.config.Heartbeat.Interval
partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
new file mode 100644
index 0000000..1828fd7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -0,0 +1,5 @@
+package client
+
+const (
+ tubeMQClientVersion = "0.1.0"
+)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 6ce2915..08295c5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -17,6 +17,13 @@
package metadata
+const (
+ Disconnect = 2
+ OnlyDisconnect = 20
+ Connect = 1
+ OnlyConnect = 10
+)
+
// ConsumerEvent represents the metadata of a consumer event
type ConsumerEvent struct {
rebalanceID int64
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 355e09b..bfc63b7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -39,7 +39,6 @@ type RmtDataCache struct {
eventReadMu sync.Mutex
metaMu sync.Mutex
dataBookMu sync.Mutex
- eventReadCond *sync.Cond
brokerPartitions map[*metadata.Node]map[string]bool
qryPriorityID int32
partitions map[string]*metadata.Partition
@@ -48,6 +47,8 @@ type RmtDataCache struct {
partitionTimeouts map[string]*time.Timer
topicPartitions map[string]map[string]bool
partitionRegBooked map[string]bool
+ // EventCh is the channel for consumer to consume
+ EventCh chan *metadata.ConsumerEvent
}
// NewRmtDataCache returns a default rmtDataCache.
@@ -65,8 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
partitionTimeouts: make(map[string]*time.Timer),
topicPartitions: make(map[string]map[string]bool),
partitionRegBooked: make(map[string]bool),
+ EventCh: make(chan *metadata.ConsumerEvent, 1),
}
- r.eventReadCond = sync.NewCond(&r.eventReadMu)
return r
}
@@ -149,24 +150,14 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
}
-// OfferEvent offers an consumer event and notifies the consumer method.
-func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+// OfferEventAndNotify offers an consumer event and notifies the consumer method and notify the consumer to consume.
+func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
r.eventReadMu.Lock()
defer r.eventReadMu.Unlock()
r.rebalanceResults = append(r.rebalanceResults, event)
- r.eventReadCond.Broadcast()
-}
-
-// TakeEvent takes an event from the rebalanceResults.
-func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
- r.eventReadMu.Lock()
- defer r.eventReadMu.Unlock()
- for len(r.rebalanceResults) == 0 {
- r.eventReadCond.Wait()
- }
- event := r.rebalanceResults[0]
+ e := r.rebalanceResults[0]
r.rebalanceResults = r.rebalanceResults[1:]
- return event
+ r.EventCh <- e
}
// ClearEvent clears all the events.