You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/15 01:04:51 UTC

[rocketmq-client-go] branch native updated: [ISSUE #75] Support Statistics of client (#113)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 0aa7eaf  [ISSUE #75] Support Statistics of client (#113)
0aa7eaf is described below

commit 0aa7eaf64d09195f7da2c314dee6512aad3c9bea
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Mon Jul 15 09:04:47 2019 +0800

    [ISSUE #75] Support Statistics of client (#113)
    
    * complete client statistic feature
    
    * fix statistic caller
    
    * fix typo
---
 consumer/push_consumer.go |   6 +-
 consumer/statistics.go    | 393 ++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 382 insertions(+), 17 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9000651..477c3e6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -455,8 +455,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			prevRequestOffset := request.nextOffset
 			request.nextOffset = result.NextBeginOffset
 
-			rt := time.Now().Sub(beginTime)
-			increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+			rt := time.Now().Sub(beginTime) / time.Millisecond
+			increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
 
 			result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
 
@@ -658,7 +658,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 			}
 
 			// TODO hook
-			increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+			increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
 
 			if !pq.dropped {
 				msgBackFailed := make([]*primitive.MessageExt, 0)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 29045a0..adc7b38 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -17,44 +17,409 @@ limitations under the License.
 
 package consumer
 
-import "time"
+import (
+	"container/list"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"sync"
+	"sync/atomic"
+	"time"
+)
 
 var (
-	topicAndGroupConsumeOKTPS     = &statsItemSet{statsName: "CONSUME_OK_TPS"}
-	topicAndGroupConsumeRT        = &statsItemSet{statsName: "CONSUME_FAILED_TPS"}
-	topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
-	topicAndGroupPullTPS          = &statsItemSet{statsName: "PULL_TPS"}
-	topicAndGroupPullRT           = &statsItemSet{statsName: "PULL_RT"}
+	csListLock sync.Mutex
+
+	topicAndGroupConsumeOKTPS     *statsItemSet
+	topicAndGroupConsumeRT        *statsItemSet
+	topicAndGroupConsumeFailedTPS *statsItemSet
+	topicAndGroupPullTPS          *statsItemSet
+	topicAndGroupPullRT           *statsItemSet
 )
 
-type statsItem struct {
+func init() {
+	topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
+	topicAndGroupConsumeRT = newStatsItemSet("CONSUME_FAILED_TPS")
+	topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_RT")
+	topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
+	topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+}
+
+type ConsumeStatus struct {
+	PullRT            float64
+	PullTPS           float64
+	ConsumeRT         float64
+	ConsumeOKTPS      float64
+	ConsumeFailedTPS  float64
+	ConsumeFailedMsgs int64
+}
+
+func increasePullRT(group, topic string, rt int64) {
+	topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increasePullTPS(group, topic string, msgs int) {
+	topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeRT(group, topic string, rt int64) {
+	topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increaseConsumeOKTPS(group, topic string, msgs int) {
+	topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgs int) {
+	topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func GetConsumeStatus(group, topic string) ConsumeStatus {
+	cs := ConsumeStatus{}
+	ss := getPullRT(group, topic)
+	cs.PullTPS = ss.tps
+
+	ss = getPullTPS(group, topic)
+	cs.PullTPS = ss.tps
+
+	ss = getConsumeRT(group, topic)
+	cs.ConsumeRT = ss.avgpt
+
+	ss = getConsumeOKTPS(group, topic)
+	cs.ConsumeOKTPS = ss.tps
+
+	ss = getConsumeFailedTPS(group, topic)
+
+	cs.ConsumeFailedTPS = ss.tps
+
+	ss = topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
+	cs.ConsumeFailedMsgs = ss.sum
+	return cs
+}
+
+func ShutDownStatis() {
+	topicAndGroupConsumeOKTPS.closed = true
+	topicAndGroupConsumeRT.closed = true
+	topicAndGroupConsumeFailedTPS.closed = true
+	topicAndGroupPullTPS.closed = true
+	topicAndGroupPullRT.closed = true
+}
+
+func getPullRT(group, topic string) statsSnapshot {
+	return topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getPullTPS(group, topic string) statsSnapshot {
+	return topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getConsumeRT(group, topic string) statsSnapshot {
+	ss := topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+	if ss.sum == 0 {
+		return topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
+	}
+	return ss
+}
+
+func getConsumeOKTPS(group, topic string) statsSnapshot {
+	return topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getConsumeFailedTPS(group, topic string) statsSnapshot {
+	return topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
 }
 
 type statsItemSet struct {
 	statsName      string
-	statsItemTable map[string]statsItem
+	statsItemTable sync.Map
+	closed         bool
+}
+
+func newStatsItemSet(statsName string) *statsItemSet {
+	sis := &statsItemSet{
+		statsName: statsName,
+	}
+	sis.init()
+	return sis
+}
+
+func (sis *statsItemSet) init() {
+	go func() {
+		for !sis.closed {
+			sis.samplingInSeconds()
+			time.Sleep(10 * time.Second)
+		}
+	}()
+
+	go func() {
+		for !sis.closed {
+			sis.samplingInMinutes()
+			time.Sleep(10 * time.Minute)
+		}
+	}()
+
+	go func() {
+		for !sis.closed {
+			sis.samplingInHour()
+			time.Sleep(time.Hour)
+		}
+	}()
+
+	go func() {
+		time.Sleep(nextMinutesTime().Sub(time.Now()))
+		for !sis.closed {
+			sis.printAtMinutes()
+			time.Sleep(time.Minute)
+		}
+	}()
+
+	go func() {
+		time.Sleep(nextHourTime().Sub(time.Now()))
+		for !sis.closed {
+			sis.printAtHour()
+			time.Sleep(time.Hour)
+		}
+	}()
+
+	go func() {
+		time.Sleep(nextMonthTime().Sub(time.Now()))
+		for !sis.closed {
+			sis.printAtDay()
+			time.Sleep(24 * time.Hour)
+		}
+	}()
+}
+
+
+func (sis *statsItemSet) samplingInSeconds() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.samplingInSeconds()
+		return true
+	})
+}
+
+func (sis *statsItemSet) samplingInMinutes() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.samplingInMinutes()
+		return true
+	})
+}
+
+func (sis *statsItemSet) samplingInHour() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.samplingInHour()
+		return true
+	})
+}
+
+func (sis *statsItemSet) printAtMinutes() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.printAtMinutes()
+		return true
+	})
+}
+
+func (sis *statsItemSet) printAtHour() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.printAtHour()
+		return true
+	})
 }
 
-func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+func (sis *statsItemSet) printAtDay() {
+	sis.statsItemTable.Range(func(key, value interface{}) bool {
+		si := value.(*statsItem)
+		si.printAtDay()
+		return true
+	})
+}
 
+func (sis *statsItemSet) addValue(key string, incValue, incTimes int64) {
+	si := sis.getAndCreateStateItem(key)
+	atomic.AddInt64(&si.value, incValue)
+	atomic.AddInt64(&si.times, incTimes)
 }
 
-func increasePullRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getAndCreateStateItem(key string) *statsItem {
+	if val, ok := sis.statsItemTable.Load(key); ok {
+		return val.(*statsItem)
+	} else {
+		si := newStatsItem(sis.statsName, key)
+		sis.statsItemTable.Store(key, si)
+		return si
+	}
+}
 
+func (sis *statsItemSet) getStatsDataInMinute(key string) statsSnapshot {
+	if val, ok := sis.statsItemTable.Load(key); ok {
+		si := val.(*statsItem)
+		return si.getStatsDataInMinute()
+	}
+	return statsSnapshot{}
 }
 
-func increaseConsumeRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getStatsDataInHour(key string) statsSnapshot {
+	if val, ok := sis.statsItemTable.Load(key); ok {
+		si := val.(*statsItem)
+		return si.getStatsDataInHour()
+	}
+	return statsSnapshot{}
+}
 
+func (sis *statsItemSet) getStatsDataInDay(key string) statsSnapshot {
+	if val, ok := sis.statsItemTable.Load(key); ok {
+		si := val.(*statsItem)
+		return si.getStatsDataInDay()
+	}
+	return statsSnapshot{}
 }
 
-func increasePullTPS(group, topic string, msgNumber int) {
+func (sis *statsItemSet) getStatsItem(key string) *statsItem {
+	val, _ := sis.statsItemTable.Load(key)
+	return val.(*statsItem)
+}
 
+type statsItem struct {
+	value            int64
+	times            int64
+	csListMinute     *list.List
+	csListHour       *list.List
+	csListDay        *list.List
+	statsName        string
+	statsKey         string
+	csListMinuteLock sync.Mutex
+	csListHourLock   sync.Mutex
+	csListDayLock    sync.Mutex
 }
 
-func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+func (si *statsItem) getStatsDataInMinute() statsSnapshot {
+	return computeStatsData(si.csListMinute)
+}
+
+func (si *statsItem) getStatsDataInHour() statsSnapshot {
+	return computeStatsData(si.csListHour)
+}
+
+func (si *statsItem) getStatsDataInDay() statsSnapshot {
+	return computeStatsData(si.csListDay)
+}
 
+func newStatsItem(statsName, statsKey string) *statsItem {
+	return &statsItem{
+		statsName:    statsName,
+		statsKey:     statsKey,
+		csListMinute: list.New(),
+		csListHour:   list.New(),
+		csListDay:    list.New(),
+	}
 }
 
-func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+func (si *statsItem) samplingInSeconds() {
+	si.csListMinuteLock.Lock()
+	defer si.csListMinuteLock.Unlock()
+	si.csListMinute.PushBack(callSnapshot{
+		timestamp: time.Now().Unix() * 1000,
+		time:      atomic.LoadInt64(&si.times),
+		value:     atomic.LoadInt64(&si.value),
+	})
+	if si.csListMinute.Len() > 7 {
+		si.csListMinute.Remove(si.csListMinute.Front())
+	}
+}
+
+func (si *statsItem) samplingInMinutes() {
+	si.csListHourLock.Lock()
+	defer si.csListHourLock.Unlock()
+	si.csListHour.PushBack(callSnapshot{
+		timestamp: time.Now().Unix() * 1000,
+		time:      atomic.LoadInt64(&si.times),
+		value:     atomic.LoadInt64(&si.value),
+	})
+	if si.csListHour.Len() > 7 {
+		si.csListHour.Remove(si.csListHour.Front())
+	}
+}
+
+func (si *statsItem) samplingInHour() {
+	si.csListDayLock.Lock()
+	defer si.csListDayLock.Unlock()
+	si.csListDay.PushBack(callSnapshot{
+		timestamp: time.Now().Unix() * 1000,
+		time:      atomic.LoadInt64(&si.times),
+		value:     atomic.LoadInt64(&si.value),
+	})
+	if si.csListDay.Len() > 25 {
+		si.csListHour.Remove(si.csListDay.Front())
+	}
+}
+
+func (si *statsItem) printAtMinutes() {
+	ss := computeStatsData(si.csListMinute)
+	rlog.Infof("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+		si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtHour() {
+	ss := computeStatsData(si.csListHour)
+	rlog.Infof("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+		si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtDay() {
+	ss := computeStatsData(si.csListDay)
+	rlog.Infof("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+		si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func nextMinutesTime() time.Time {
+	now := time.Now()
+	m, _ := time.ParseDuration("1m")
+	return now.Add(m)
+}
+
+func nextHourTime() time.Time {
+	now := time.Now()
+	m, _ := time.ParseDuration("1h")
+	return now.Add(m)
+}
+
+func nextMonthTime() time.Time {
+	now := time.Now()
+	return now.AddDate(0, 1, 0)
+}
+
+func computeStatsData(csList *list.List) statsSnapshot {
+	csListLock.Lock()
+	defer csListLock.Unlock()
+	tps, avgpt, sum := 0.0, 0.0, int64(0)
+	if csList.Len() > 0 {
+		first := csList.Front().Value.(callSnapshot)
+		last := csList.Back().Value.(callSnapshot)
+		sum = last.value - first.value
+		tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
+		timesDiff := last.time - first.time
+		if timesDiff > 0 {
+			avgpt = float64(sum*1.0) / float64(timesDiff)
+		}
+	}
+	return statsSnapshot{
+		tps:   tps,
+		avgpt: avgpt,
+		sum:   sum,
+	}
+}
+
+type callSnapshot struct {
+	timestamp int64
+	time      int64
+	value     int64
+}
 
+type statsSnapshot struct {
+	sum   int64
+	tps   float64
+	avgpt float64
 }