You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/06/08 15:40:07 UTC

[rocketmq-client-go] branch master updated: optimizing goroutine of Stat creation

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5168198  optimizing goroutine of Stat creation
     new 817ad27  Merge pull request #677 from wenfengwang/fix-init
5168198 is described below

commit 516819872a23e24f81d863fd658b632e8217007d
Author: wenfeng.wang <sx...@gmail.com>
AuthorDate: Tue Jun 8 15:18:31 2021 +0800

    optimizing goroutine of Stat creation
---
 consumer/consumer.go        |   4 ++
 consumer/push_consumer.go   |  24 ++++----
 consumer/statistics.go      | 138 ++++++++++++++++++++++----------------------
 consumer/statistics_test.go |  63 ++++++++++----------
 4 files changed, 120 insertions(+), 109 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 3504786..584a4b5 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -271,6 +271,8 @@ type defaultConsumer struct {
 	namesrv internal.Namesrvs
 
 	pullFromWhichNodeTable sync.Map
+
+	stat *StatsManager
 }
 
 func (dc *defaultConsumer) start() error {
@@ -291,6 +293,7 @@ func (dc *defaultConsumer) start() error {
 	dc.client.Start()
 	atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
 	dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
+	dc.stat = NewStatsManager()
 	return nil
 }
 
@@ -305,6 +308,7 @@ func (dc *defaultConsumer) shutdown() error {
 		mqs = append(mqs, &k)
 		return true
 	})
+	dc.stat.ShutDownStat()
 	dc.storage.persist(mqs)
 	dc.client.Shutdown()
 	return nil
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 49342a2..519ba36 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -350,7 +350,7 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker
 		res.ConsumeResult = internal.ConsumeRetryLater
 	}
 
-	increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+	pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
 
 	return res
 }
@@ -362,12 +362,12 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
 		topic := key.(string)
 		info.SubscriptionData[value.(*internal.SubscriptionData)] = true
 		status := internal.ConsumeStatus{
-			PullRT:            getPullRT(topic, pc.consumerGroup).avgpt,
-			PullTPS:           getPullTPS(topic, pc.consumerGroup).tps,
-			ConsumeRT:         getConsumeRT(topic, pc.consumerGroup).avgpt,
-			ConsumeOKTPS:      getConsumeOKTPS(topic, pc.consumerGroup).tps,
-			ConsumeFailedTPS:  getConsumeFailedTPS(topic, pc.consumerGroup).tps,
-			ConsumeFailedMsgs: topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
+			PullRT:            pc.stat.getPullRT(topic, pc.consumerGroup).avgpt,
+			PullTPS:           pc.stat.getPullTPS(topic, pc.consumerGroup).tps,
+			ConsumeRT:         pc.stat.getConsumeRT(topic, pc.consumerGroup).avgpt,
+			ConsumeOKTPS:      pc.stat.getConsumeOKTPS(topic, pc.consumerGroup).tps,
+			ConsumeFailedTPS:  pc.stat.getConsumeFailedTPS(topic, pc.consumerGroup).tps,
+			ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
 		}
 		info.StatusTable[topic] = status
 		return true
@@ -743,7 +743,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			request.nextOffset = result.NextBeginOffset
 
 			rt := time.Now().Sub(beginTime) / time.Millisecond
-			increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
+			pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
 
 			pc.processPullResult(request.mq, result, sd)
 
@@ -751,7 +751,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			firstMsgOffset := int64(math.MaxInt64)
 			if msgFounded != nil && len(msgFounded) != 0 {
 				firstMsgOffset = msgFounded[0].QueueOffset
-				increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
+				pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
 				pq.putMessage(msgFounded...)
 			}
 			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
@@ -1007,14 +1007,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
 			}
 
-			increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+			pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
 
 			if !pq.IsDroppd() {
 				msgBackFailed := make([]*primitive.MessageExt, 0)
 				if result == ConsumeSuccess {
-					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+					pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 				} else {
-					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+					pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 					if pc.model == BroadCasting {
 						for i := 0; i < len(subMsgs); i++ {
 							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
diff --git a/consumer/statistics.go b/consumer/statistics.go
index aae5f89..e9d5d79 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -28,23 +28,24 @@ import (
 	"github.com/apache/rocketmq-client-go/v2/rlog"
 )
 
-var (
-	csListLock sync.Mutex
-	closeOnce  sync.Once
-
+type StatsManager struct {
+	startOnce                     sync.Once
+	closeOnce                     sync.Once
 	topicAndGroupConsumeOKTPS     *statsItemSet
 	topicAndGroupConsumeRT        *statsItemSet
 	topicAndGroupConsumeFailedTPS *statsItemSet
 	topicAndGroupPullTPS          *statsItemSet
 	topicAndGroupPullRT           *statsItemSet
-)
+}
 
-func init() {
-	topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
-	topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
-	topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
-	topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
-	topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+func NewStatsManager() *StatsManager {
+	mgr := &StatsManager{}
+	mgr.topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
+	mgr.topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
+	mgr.topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
+	mgr.topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
+	mgr.topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+	return mgr
 }
 
 type ConsumeStatus struct {
@@ -56,81 +57,104 @@ type ConsumeStatus struct {
 	ConsumeFailedMsgs int64
 }
 
-func increasePullRT(group, topic string, rt int64) {
-	topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
+func (mgr *StatsManager) increasePullRT(group, topic string, rt int64) {
+	mgr.topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
 }
 
-func increasePullTPS(group, topic string, msgs int) {
-	topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increasePullTPS(group, topic string, msgs int) {
+	mgr.topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
 }
 
-func increaseConsumeRT(group, topic string, rt int64) {
-	topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
+func (mgr *StatsManager) increaseConsumeRT(group, topic string, rt int64) {
+	mgr.topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
 }
 
-func increaseConsumeOKTPS(group, topic string, msgs int) {
-	topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increaseConsumeOKTPS(group, topic string, msgs int) {
+	mgr.topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
 }
 
-func increaseConsumeFailedTPS(group, topic string, msgs int) {
-	topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increaseConsumeFailedTPS(group, topic string, msgs int) {
+	mgr.topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
 }
 
-func GetConsumeStatus(group, topic string) ConsumeStatus {
+func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus {
 	cs := ConsumeStatus{}
-	ss := getPullRT(group, topic)
+	ss := mgr.getPullRT(group, topic)
 	cs.PullTPS = ss.tps
 
-	ss = getPullTPS(group, topic)
+	ss = mgr.getPullTPS(group, topic)
 	cs.PullTPS = ss.tps
 
-	ss = getConsumeRT(group, topic)
+	ss = mgr.getConsumeRT(group, topic)
 	cs.ConsumeRT = ss.avgpt
 
-	ss = getConsumeOKTPS(group, topic)
+	ss = mgr.getConsumeOKTPS(group, topic)
 	cs.ConsumeOKTPS = ss.tps
 
-	ss = getConsumeFailedTPS(group, topic)
+	ss = mgr.getConsumeFailedTPS(group, topic)
 
 	cs.ConsumeFailedTPS = ss.tps
 
-	ss = topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
+	ss = mgr.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
 	cs.ConsumeFailedMsgs = ss.sum
 	return cs
 }
 
-func ShutDownStatis() {
-	closeOnce.Do(func() {
-		close(topicAndGroupConsumeOKTPS.closed)
-		close(topicAndGroupConsumeRT.closed)
-		close(topicAndGroupConsumeFailedTPS.closed)
-		close(topicAndGroupPullTPS.closed)
-		close(topicAndGroupPullRT.closed)
+func (mgr *StatsManager) ShutDownStat() {
+	mgr.closeOnce.Do(func() {
+		close(mgr.topicAndGroupConsumeOKTPS.closed)
+		close(mgr.topicAndGroupConsumeRT.closed)
+		close(mgr.topicAndGroupConsumeFailedTPS.closed)
+		close(mgr.topicAndGroupPullTPS.closed)
+		close(mgr.topicAndGroupPullRT.closed)
 	})
 }
 
-func getPullRT(group, topic string) statsSnapshot {
-	return topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getPullRT(group, topic string) statsSnapshot {
+	return mgr.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
 }
 
-func getPullTPS(group, topic string) statsSnapshot {
-	return topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getPullTPS(group, topic string) statsSnapshot {
+	return mgr.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
 }
 
-func getConsumeRT(group, topic string) statsSnapshot {
-	ss := topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeRT(group, topic string) statsSnapshot {
+	ss := mgr.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
 	if ss.sum == 0 {
-		return topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
+		return mgr.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
 	}
 	return ss
 }
 
-func getConsumeOKTPS(group, topic string) statsSnapshot {
-	return topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeOKTPS(group, topic string) statsSnapshot {
+	return mgr.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
 }
 
-func getConsumeFailedTPS(group, topic string) statsSnapshot {
-	return topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeFailedTPS(group, topic string) statsSnapshot {
+	return mgr.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+var csListLock sync.Mutex
+
+func computeStatsData(csList *list.List) statsSnapshot {
+	csListLock.Lock()
+	defer csListLock.Unlock()
+	tps, avgpt, sum := 0.0, 0.0, int64(0)
+	if csList.Len() > 0 {
+		first := csList.Front().Value.(callSnapshot)
+		last := csList.Back().Value.(callSnapshot)
+		sum = last.value - first.value
+		tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
+		timesDiff := last.time - first.time
+		if timesDiff > 0 {
+			avgpt = float64(sum*1.0) / float64(timesDiff)
+		}
+	}
+	return statsSnapshot{
+		tps:   tps,
+		avgpt: avgpt,
+		sum:   sum,
+	}
 }
 
 type statsItemSet struct {
@@ -158,7 +182,6 @@ func (sis *statsItemSet) init() {
 				return
 			case <-ticker.C:
 				sis.samplingInSeconds()
-
 			}
 		}
 	})
@@ -449,27 +472,6 @@ func nextMonthTime() time.Time {
 	return now.AddDate(0, 1, 0)
 }
 
-func computeStatsData(csList *list.List) statsSnapshot {
-	csListLock.Lock()
-	defer csListLock.Unlock()
-	tps, avgpt, sum := 0.0, 0.0, int64(0)
-	if csList.Len() > 0 {
-		first := csList.Front().Value.(callSnapshot)
-		last := csList.Back().Value.(callSnapshot)
-		sum = last.value - first.value
-		tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
-		timesDiff := last.time - first.time
-		if timesDiff > 0 {
-			avgpt = float64(sum*1.0) / float64(timesDiff)
-		}
-	}
-	return statsSnapshot{
-		tps:   tps,
-		avgpt: avgpt,
-		sum:   sum,
-	}
-}
-
 type callSnapshot struct {
 	timestamp int64
 	time      int64
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index f70ed0b..4836141 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -51,7 +51,8 @@ func TestNextHourTime(t *testing.T) {
 }
 
 func TestIncreasePullRTGetPullRT(t *testing.T) {
-	ShutDownStatis()
+	mgr := NewStatsManager()
+	mgr.ShutDownStat()
 
 	tests := []struct {
 		RT        int64
@@ -67,9 +68,9 @@ func TestIncreasePullRTGetPullRT(t *testing.T) {
 		{1, 6},
 	}
 	for _, tt := range tests {
-		increasePullRT("rocketmq", "default", tt.RT)
-		topicAndGroupPullRT.samplingInSeconds()
-		snapshot := getPullRT("rocketmq", "default")
+		mgr.increasePullRT("rocketmq", "default", tt.RT)
+		mgr.topicAndGroupPullRT.samplingInSeconds()
+		snapshot := mgr.getPullRT("rocketmq", "default")
 		if snapshot.sum != tt.ExpectSum {
 			t.Errorf("wrong Pull RT sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
 		}
@@ -77,7 +78,7 @@ func TestIncreasePullRTGetPullRT(t *testing.T) {
 }
 
 //func TestIncreaseConsumeRTGetConsumeRT(t *testing.T) {
-//	ShutDownStatis()
+//	ShutDownStat()
 //	tests := []struct {
 //		RT        int64
 //		ExpectSum int64
@@ -102,7 +103,8 @@ func TestIncreasePullRTGetPullRT(t *testing.T) {
 //}
 
 func TestIncreasePullTPSGetPullTPS(t *testing.T) {
-	ShutDownStatis()
+	mgr := NewStatsManager()
+	mgr.ShutDownStat()
 	tests := []struct {
 		RT        int
 		ExpectSum int64
@@ -117,9 +119,9 @@ func TestIncreasePullTPSGetPullTPS(t *testing.T) {
 		{1, 6},
 	}
 	for _, tt := range tests {
-		increasePullTPS("rocketmq", "default", tt.RT)
-		topicAndGroupPullTPS.samplingInSeconds()
-		snapshot := getPullTPS("rocketmq", "default")
+		mgr.increasePullTPS("rocketmq", "default", tt.RT)
+		mgr.topicAndGroupPullTPS.samplingInSeconds()
+		snapshot := mgr.getPullTPS("rocketmq", "default")
 		if snapshot.sum != tt.ExpectSum {
 			t.Errorf("wrong Pull TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
 		}
@@ -127,7 +129,8 @@ func TestIncreasePullTPSGetPullTPS(t *testing.T) {
 }
 
 func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
-	ShutDownStatis()
+	mgr := NewStatsManager()
+	mgr.ShutDownStat()
 	tests := []struct {
 		RT        int
 		ExpectSum int64
@@ -142,9 +145,9 @@ func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
 		{1, 6},
 	}
 	for _, tt := range tests {
-		increaseConsumeOKTPS("rocketmq", "default", tt.RT)
-		topicAndGroupConsumeOKTPS.samplingInSeconds()
-		snapshot := getConsumeOKTPS("rocketmq", "default")
+		mgr.increaseConsumeOKTPS("rocketmq", "default", tt.RT)
+		mgr.topicAndGroupConsumeOKTPS.samplingInSeconds()
+		snapshot := mgr.getConsumeOKTPS("rocketmq", "default")
 		if snapshot.sum != tt.ExpectSum {
 			t.Errorf("wrong Consume OK TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
 		}
@@ -152,7 +155,8 @@ func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
 }
 
 func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
-	ShutDownStatis()
+	mgr := NewStatsManager()
+	mgr.ShutDownStat()
 	tests := []struct {
 		RT        int
 		ExpectSum int64
@@ -167,9 +171,9 @@ func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
 		{1, 6},
 	}
 	for _, tt := range tests {
-		increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
-		topicAndGroupConsumeFailedTPS.samplingInSeconds()
-		snapshot := getConsumeFailedTPS("rocketmq", "default")
+		mgr.increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
+		mgr.topicAndGroupConsumeFailedTPS.samplingInSeconds()
+		snapshot := mgr.getConsumeFailedTPS("rocketmq", "default")
 		if snapshot.sum != tt.ExpectSum {
 			t.Errorf("wrong Consume Failed TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
 		}
@@ -177,7 +181,8 @@ func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
 }
 
 func TestGetConsumeStatus(t *testing.T) {
-	ShutDownStatis()
+	mgr := NewStatsManager()
+	mgr.ShutDownStat()
 	group, topic := "rocketmq", "default"
 
 	tests := []struct {
@@ -191,17 +196,17 @@ func TestGetConsumeStatus(t *testing.T) {
 		{1, 4},
 	}
 	for _, tt := range tests {
-		increasePullRT(group, topic, int64(tt.RT))
-		increasePullTPS(group, topic, tt.RT)
-		increaseConsumeRT(group, topic, int64(tt.RT))
-		increaseConsumeOKTPS(group, topic, tt.RT)
-		increaseConsumeFailedTPS(group, topic, tt.RT)
-		topicAndGroupPullRT.samplingInSeconds()
-		topicAndGroupPullTPS.samplingInSeconds()
-		topicAndGroupConsumeRT.samplingInMinutes()
-		topicAndGroupConsumeOKTPS.samplingInSeconds()
-		topicAndGroupConsumeFailedTPS.samplingInMinutes()
-		status := GetConsumeStatus(group, topic)
+		mgr.increasePullRT(group, topic, int64(tt.RT))
+		mgr.increasePullTPS(group, topic, tt.RT)
+		mgr.increaseConsumeRT(group, topic, int64(tt.RT))
+		mgr.increaseConsumeOKTPS(group, topic, tt.RT)
+		mgr.increaseConsumeFailedTPS(group, topic, tt.RT)
+		mgr.topicAndGroupPullRT.samplingInSeconds()
+		mgr.topicAndGroupPullTPS.samplingInSeconds()
+		mgr.topicAndGroupConsumeRT.samplingInMinutes()
+		mgr.topicAndGroupConsumeOKTPS.samplingInSeconds()
+		mgr.topicAndGroupConsumeFailedTPS.samplingInMinutes()
+		status := mgr.GetConsumeStatus(group, topic)
 		if status.ConsumeFailedMsgs != tt.ExpectFailMessage {
 			t.Errorf("wrong ConsumeFailedMsg. want=0, got=%d", status.ConsumeFailedMsgs)
 		}