You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:56 UTC

[incubator-inlong] 03/09: Implement offerEventResult

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a2af6f05690e750b899cb5e5257010459d693959
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jun 4 10:20:54 2021 +0800

    Implement offerEventResult
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go       |  7 ++--
 .../tubemq-client-go/remote/remote.go              | 39 ++++++++++++++--------
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index c0f7e9a..b7312bb 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -203,9 +203,6 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
 func (c *consumer) processRebalanceEvent() {
 	for {
 		event := c.rmtDataCache.TakeEvent()
-		if event == nil {
-			continue
-		}
 		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
 			break
 		}
@@ -213,10 +210,10 @@ func (c *consumer) processRebalanceEvent() {
 		switch event.GetEventType() {
 		case 2, 20:
 			c.disconnect2Broker(event)
-			c.rmtDataCache.OfferEvent(event)
+			c.rmtDataCache.OfferEventResult(event)
 		case 1, 10:
 			c.connect2Broker(event)
-			c.rmtDataCache.OfferEvent(event)
+			c.rmtDataCache.OfferEventResult(event)
 		}
 	}
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index aa9a2d1..1d82c89 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -35,9 +35,11 @@ type RmtDataCache struct {
 	groupFlowCtrlID    int64
 	partitionSubInfo   map[string]*metadata.SubscribeInfo
 	rebalanceResults   []*metadata.ConsumerEvent
-	eventMu            sync.Mutex
+	eventWriteMu       sync.Mutex
+	eventReadMu        sync.Mutex
 	metaMu             sync.Mutex
 	dataBookMu         sync.Mutex
+	eventReadCond      *sync.Cond
 	brokerPartitions   map[*metadata.Node]map[string]bool
 	qryPriorityID      int32
 	partitions         map[string]*metadata.Partition
@@ -50,7 +52,7 @@ type RmtDataCache struct {
 
 // NewRmtDataCache returns a default rmtDataCache.
 func NewRmtDataCache() *RmtDataCache {
-	return &RmtDataCache{
+	r := &RmtDataCache{
 		defFlowCtrlID:      util.InvalidValue,
 		groupFlowCtrlID:    util.InvalidValue,
 		qryPriorityID:      int32(util.InvalidValue),
@@ -64,6 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
 	}
+	r.eventReadCond = sync.NewCond(&r.eventReadMu)
+	return r
 }
 
 // GetUnderGroupCtrl returns the underGroupCtrl.
@@ -104,8 +108,8 @@ func (r *RmtDataCache) GetQryPriorityID() int32 {
 
 // PollEventResult polls the first event result from the rebalanceResults.
 func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
 	if len(r.rebalanceResults) > 0 {
 		event := r.rebalanceResults[0]
 		r.rebalanceResults = r.rebalanceResults[1:]
@@ -145,19 +149,20 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event.
+// OfferEvent offers an consumer event and notifies the consumer method.
 func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventReadMu.Lock()
+	defer r.eventReadMu.Unlock()
 	r.rebalanceResults = append(r.rebalanceResults, event)
+	r.eventReadCond.Broadcast()
 }
 
 // TakeEvent takes an event from the rebalanceResults.
 func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
-	if len(r.rebalanceResults) == 0 {
-		return nil
+	r.eventReadMu.Lock()
+	defer r.eventReadMu.Unlock()
+	for len(r.rebalanceResults) == 0 {
+		r.eventReadCond.Wait()
 	}
 	event := r.rebalanceResults[0]
 	r.rebalanceResults = r.rebalanceResults[1:]
@@ -166,11 +171,19 @@ func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
 
 // ClearEvent clears all the events.
 func (r *RmtDataCache) ClearEvent() {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
 	r.rebalanceResults = r.rebalanceResults[:0]
 }
 
+// OfferEventResult offers an consumer event.
+func (r *RmtDataCache) OfferEventResult(event *metadata.ConsumerEvent) {
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
+
+	r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
 // RemoveAndGetPartition removes the given partitions.
 func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
 	if len(subscribeInfos) == 0 {