You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:56 UTC
[incubator-inlong] 03/09: Implement offerEventResult
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit a2af6f05690e750b899cb5e5257010459d693959
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jun 4 10:20:54 2021 +0800
Implement offerEventResult
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer_impl.go | 7 ++--
.../tubemq-client-go/remote/remote.go | 39 ++++++++++++++--------
2 files changed, 28 insertions(+), 18 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index c0f7e9a..b7312bb 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -203,9 +203,6 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
func (c *consumer) processRebalanceEvent() {
for {
event := c.rmtDataCache.TakeEvent()
- if event == nil {
- continue
- }
if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
break
}
@@ -213,10 +210,10 @@ func (c *consumer) processRebalanceEvent() {
switch event.GetEventType() {
case 2, 20:
c.disconnect2Broker(event)
- c.rmtDataCache.OfferEvent(event)
+ c.rmtDataCache.OfferEventResult(event)
case 1, 10:
c.connect2Broker(event)
- c.rmtDataCache.OfferEvent(event)
+ c.rmtDataCache.OfferEventResult(event)
}
}
}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index aa9a2d1..1d82c89 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -35,9 +35,11 @@ type RmtDataCache struct {
groupFlowCtrlID int64
partitionSubInfo map[string]*metadata.SubscribeInfo
rebalanceResults []*metadata.ConsumerEvent
- eventMu sync.Mutex
+ eventWriteMu sync.Mutex
+ eventReadMu sync.Mutex
metaMu sync.Mutex
dataBookMu sync.Mutex
+ eventReadCond *sync.Cond
brokerPartitions map[*metadata.Node]map[string]bool
qryPriorityID int32
partitions map[string]*metadata.Partition
@@ -50,7 +52,7 @@ type RmtDataCache struct {
// NewRmtDataCache returns a default rmtDataCache.
func NewRmtDataCache() *RmtDataCache {
- return &RmtDataCache{
+ r := &RmtDataCache{
defFlowCtrlID: util.InvalidValue,
groupFlowCtrlID: util.InvalidValue,
qryPriorityID: int32(util.InvalidValue),
@@ -64,6 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
topicPartitions: make(map[string]map[string]bool),
partitionRegBooked: make(map[string]bool),
}
+ r.eventReadCond = sync.NewCond(&r.eventReadMu)
+ return r
}
// GetUnderGroupCtrl returns the underGroupCtrl.
@@ -104,8 +108,8 @@ func (r *RmtDataCache) GetQryPriorityID() int32 {
// PollEventResult polls the first event result from the rebalanceResults.
func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
- r.eventMu.Lock()
- defer r.eventMu.Unlock()
+ r.eventWriteMu.Lock()
+ defer r.eventWriteMu.Unlock()
if len(r.rebalanceResults) > 0 {
event := r.rebalanceResults[0]
r.rebalanceResults = r.rebalanceResults[1:]
@@ -145,19 +149,20 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
}
-// OfferEvent offers an consumer event.
+// OfferEvent offers an consumer event and notifies the consumer method.
func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
- r.eventMu.Lock()
- defer r.eventMu.Unlock()
+ r.eventReadMu.Lock()
+ defer r.eventReadMu.Unlock()
r.rebalanceResults = append(r.rebalanceResults, event)
+ r.eventReadCond.Broadcast()
}
// TakeEvent takes an event from the rebalanceResults.
func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
- r.eventMu.Lock()
- defer r.eventMu.Unlock()
- if len(r.rebalanceResults) == 0 {
- return nil
+ r.eventReadMu.Lock()
+ defer r.eventReadMu.Unlock()
+ for len(r.rebalanceResults) == 0 {
+ r.eventReadCond.Wait()
}
event := r.rebalanceResults[0]
r.rebalanceResults = r.rebalanceResults[1:]
@@ -166,11 +171,19 @@ func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
// ClearEvent clears all the events.
func (r *RmtDataCache) ClearEvent() {
- r.eventMu.Lock()
- defer r.eventMu.Unlock()
+ r.eventWriteMu.Lock()
+ defer r.eventWriteMu.Unlock()
r.rebalanceResults = r.rebalanceResults[:0]
}
+// OfferEventResult offers an consumer event.
+func (r *RmtDataCache) OfferEventResult(event *metadata.ConsumerEvent) {
+ r.eventWriteMu.Lock()
+ defer r.eventWriteMu.Unlock()
+
+ r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
// RemoveAndGetPartition removes the given partitions.
func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
if len(subscribeInfos) == 0 {