You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 08:20:44 UTC

[rocketmq-client-go] branch master updated: [ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d6f07  [ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
b6d6f07 is described below

commit b6d6f071df05a17ab5a26e4fcd307e9379ce35de
Author: WJL3333 <wa...@bytedance.com>
AuthorDate: Mon Jul 25 16:20:39 2022 +0800

    [ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
    
    * use uber atomic lib to avoid atomic value data race.
    
    * change wrong sub value
    
    Co-authored-by: dinglei <li...@163.com>
---
 consumer/process_queue.go | 48 +++++++++++++++++++++++++++--------------------
 consumer/push_consumer.go |  6 +++---
 2 files changed, 31 insertions(+), 23 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index ded8d08..c07cf7e 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -20,13 +20,13 @@ package consumer
 import (
 	"strconv"
 	"sync"
-	"sync/atomic"
+
 	"time"
 
 	"github.com/emirpasic/gods/maps/treemap"
 	"github.com/emirpasic/gods/utils"
 	gods_util "github.com/emirpasic/gods/utils"
-	uatomic "go.uber.org/atomic"
+	"go.uber.org/atomic"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -40,8 +40,8 @@ const (
 )
 
 type processQueue struct {
-	cachedMsgCount             int64
-	cachedMsgSize              int64
+	cachedMsgCount             *atomic.Int64
+	cachedMsgSize              *atomic.Int64
 	tryUnlockTimes             int64
 	queueOffsetMax             int64
 	msgAccCnt                  int64
@@ -49,10 +49,10 @@ type processQueue struct {
 	mutex                      sync.RWMutex
 	consumeLock                sync.Mutex
 	consumingMsgOrderlyTreeMap *treemap.Map
-	dropped                    *uatomic.Bool
+	dropped                    *atomic.Bool
 	lastPullTime               atomic.Value
 	lastConsumeTime            atomic.Value
-	locked                     *uatomic.Bool
+	locked                     *atomic.Bool
 	lastLockTime               atomic.Value
 	consuming                  bool
 	lockConsume                sync.Mutex
@@ -75,6 +75,8 @@ func newProcessQueue(order bool) *processQueue {
 	lastPullTime.Store(time.Now())
 
 	pq := &processQueue{
+		cachedMsgCount:             atomic.NewInt64(0),
+		cachedMsgSize:              atomic.NewInt64(0),
 		msgCache:                   treemap.NewWith(utils.Int64Comparator),
 		lastPullTime:               lastPullTime,
 		lastConsumeTime:            lastConsumeTime,
@@ -84,8 +86,8 @@ func newProcessQueue(order bool) *processQueue {
 		order:                      order,
 		closeChanOnce:              &sync.Once{},
 		closeChan:                  make(chan struct{}),
-		locked:                     uatomic.NewBool(false),
-		dropped:                    uatomic.NewBool(false),
+		locked:                     atomic.NewBool(false),
+		dropped:                    atomic.NewBool(false),
 	}
 	return pq
 }
@@ -120,13 +122,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 		pq.msgCache.Put(msg.QueueOffset, msg)
 		validMessageCount++
 		pq.queueOffsetMax = msg.QueueOffset
-		atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))
+
+		pq.cachedMsgSize.Add(int64(len(msg.Body)))
 	}
-	pq.mutex.Unlock()
 
-	atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
+	pq.cachedMsgCount.Add(int64(validMessageCount))
+	pq.mutex.Unlock()
 
-	if pq.msgCache.Size() > 0 && !pq.consuming {
+	if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
 		pq.consuming = true
 	}
 
@@ -206,11 +209,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
 			if !found {
 				continue
 			}
+
 			pq.msgCache.Remove(msg.QueueOffset)
 			removedCount++
-			atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))
+
+			pq.cachedMsgSize.Sub(int64(len(msg.Body)))
 		}
-		atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))
+
+		pq.cachedMsgCount.Sub(int64(removedCount))
 	}
 	if !pq.msgCache.Empty() {
 		first, _ := pq.msgCache.Min()
@@ -228,7 +234,7 @@ func (pq *processQueue) isPullExpired() bool {
 	return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
 }
 
-func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
+func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
 	if consumer.option.ConsumeOrderly {
 		return
 	}
@@ -366,8 +372,8 @@ func (pq *processQueue) clear() {
 	pq.mutex.Lock()
 	defer pq.mutex.Unlock()
 	pq.msgCache.Clear()
-	pq.cachedMsgCount = 0
-	pq.cachedMsgSize = 0
+	pq.cachedMsgCount.Store(0)
+	pq.cachedMsgSize.Store(0)
 	pq.queueOffsetMax = 0
 }
 
@@ -380,11 +386,13 @@ func (pq *processQueue) commit() int64 {
 	if iter != nil {
 		offset = iter.(int64)
 	}
-	pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size())
+	pq.cachedMsgCount.Sub(int64(pq.consumingMsgOrderlyTreeMap.Size()))
+
 	pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) {
 		msg := value.(*primitive.MessageExt)
-		pq.cachedMsgSize -= int64(len(msg.Body))
+		pq.cachedMsgSize.Sub(int64(len(msg.Body)))
 	})
+
 	pq.consumingMsgOrderlyTreeMap.Clear()
 	return offset + 1
 }
@@ -405,7 +413,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
 		info.CachedMsgMinOffset = pq.Min()
 		info.CachedMsgMaxOffset = pq.Max()
 		info.CachedMsgCount = pq.msgCache.Size()
-		info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
+		info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024)
 	}
 
 	if !pq.consumingMsgOrderlyTreeMap.Empty() {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c6e03c5..cb4768b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -604,8 +604,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			goto NEXT
 		}
 
-		cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
-		if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
+		cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
+		if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
 			if pc.queueFlowControlTimes%1000 == 0 {
 				rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
 					"PullThresholdForQueue": pc.option.PullThresholdForQueue,
@@ -818,7 +818,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 }
 
 func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
-	if pr.pq.cachedMsgCount <= 0 {
+	if pr.pq.cachedMsgCount.Load() <= 0 {
 		pc.storage.update(pr.mq, pr.nextOffset, true)
 	}
 }