You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 08:20:44 UTC
[rocketmq-client-go] branch master updated: [ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b6d6f07 [ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
b6d6f07 is described below
commit b6d6f071df05a17ab5a26e4fcd307e9379ce35de
Author: WJL3333 <wa...@bytedance.com>
AuthorDate: Mon Jul 25 16:20:39 2022 +0800
[ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
* use uber atomic lib to avoid atomic value data race.
* change wrong sub value
Co-authored-by: dinglei <li...@163.com>
---
consumer/process_queue.go | 48 +++++++++++++++++++++++++++--------------------
consumer/push_consumer.go | 6 +++---
2 files changed, 31 insertions(+), 23 deletions(-)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index ded8d08..c07cf7e 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -20,13 +20,13 @@ package consumer
import (
"strconv"
"sync"
- "sync/atomic"
+
"time"
"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
gods_util "github.com/emirpasic/gods/utils"
- uatomic "go.uber.org/atomic"
+ "go.uber.org/atomic"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -40,8 +40,8 @@ const (
)
type processQueue struct {
- cachedMsgCount int64
- cachedMsgSize int64
+ cachedMsgCount *atomic.Int64
+ cachedMsgSize *atomic.Int64
tryUnlockTimes int64
queueOffsetMax int64
msgAccCnt int64
@@ -49,10 +49,10 @@ type processQueue struct {
mutex sync.RWMutex
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
- dropped *uatomic.Bool
+ dropped *atomic.Bool
lastPullTime atomic.Value
lastConsumeTime atomic.Value
- locked *uatomic.Bool
+ locked *atomic.Bool
lastLockTime atomic.Value
consuming bool
lockConsume sync.Mutex
@@ -75,6 +75,8 @@ func newProcessQueue(order bool) *processQueue {
lastPullTime.Store(time.Now())
pq := &processQueue{
+ cachedMsgCount: atomic.NewInt64(0),
+ cachedMsgSize: atomic.NewInt64(0),
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: lastPullTime,
lastConsumeTime: lastConsumeTime,
@@ -84,8 +86,8 @@ func newProcessQueue(order bool) *processQueue {
order: order,
closeChanOnce: &sync.Once{},
closeChan: make(chan struct{}),
- locked: uatomic.NewBool(false),
- dropped: uatomic.NewBool(false),
+ locked: atomic.NewBool(false),
+ dropped: atomic.NewBool(false),
}
return pq
}
@@ -120,13 +122,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
pq.msgCache.Put(msg.QueueOffset, msg)
validMessageCount++
pq.queueOffsetMax = msg.QueueOffset
- atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))
+
+ pq.cachedMsgSize.Add(int64(len(msg.Body)))
}
- pq.mutex.Unlock()
- atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
+ pq.cachedMsgCount.Add(int64(validMessageCount))
+ pq.mutex.Unlock()
- if pq.msgCache.Size() > 0 && !pq.consuming {
+ if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
pq.consuming = true
}
@@ -206,11 +209,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
if !found {
continue
}
+
pq.msgCache.Remove(msg.QueueOffset)
removedCount++
- atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))
+
+ pq.cachedMsgSize.Sub(int64(len(msg.Body)))
}
- atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))
+
+ pq.cachedMsgCount.Sub(int64(removedCount))
}
if !pq.msgCache.Empty() {
first, _ := pq.msgCache.Min()
@@ -228,7 +234,7 @@ func (pq *processQueue) isPullExpired() bool {
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}
-func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
+func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
if consumer.option.ConsumeOrderly {
return
}
@@ -366,8 +372,8 @@ func (pq *processQueue) clear() {
pq.mutex.Lock()
defer pq.mutex.Unlock()
pq.msgCache.Clear()
- pq.cachedMsgCount = 0
- pq.cachedMsgSize = 0
+ pq.cachedMsgCount.Store(0)
+ pq.cachedMsgSize.Store(0)
pq.queueOffsetMax = 0
}
@@ -380,11 +386,13 @@ func (pq *processQueue) commit() int64 {
if iter != nil {
offset = iter.(int64)
}
- pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size())
+ pq.cachedMsgCount.Sub(int64(pq.consumingMsgOrderlyTreeMap.Size()))
+
pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) {
msg := value.(*primitive.MessageExt)
- pq.cachedMsgSize -= int64(len(msg.Body))
+ pq.cachedMsgSize.Sub(int64(len(msg.Body)))
})
+
pq.consumingMsgOrderlyTreeMap.Clear()
return offset + 1
}
@@ -405,7 +413,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
info.CachedMsgMinOffset = pq.Min()
info.CachedMsgMaxOffset = pq.Max()
info.CachedMsgCount = pq.msgCache.Size()
- info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
+ info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024)
}
if !pq.consumingMsgOrderlyTreeMap.Empty() {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c6e03c5..cb4768b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -604,8 +604,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
goto NEXT
}
- cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
- if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
+ cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
+ if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
@@ -818,7 +818,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}
func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
- if pr.pq.cachedMsgCount <= 0 {
+ if pr.pq.cachedMsgCount.Load() <= 0 {
pc.storage.update(pr.mq, pr.nextOffset, true)
}
}