You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/15 01:04:51 UTC
[rocketmq-client-go] branch native updated: [ISSUE #75] Support
Statistics of client (#113)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 0aa7eaf [ISSUE #75] Support Statistics of client (#113)
0aa7eaf is described below
commit 0aa7eaf64d09195f7da2c314dee6512aad3c9bea
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Mon Jul 15 09:04:47 2019 +0800
[ISSUE #75] Support Statistics of client (#113)
* complete client statistic feature
* fix statistic caller
* fix typo
---
consumer/push_consumer.go | 6 +-
consumer/statistics.go | 393 ++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 382 insertions(+), 17 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9000651..477c3e6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -455,8 +455,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
- rt := time.Now().Sub(beginTime)
- increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+ rt := time.Now().Sub(beginTime) / time.Millisecond
+ increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
@@ -658,7 +658,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
}
// TODO hook
- increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+ increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
if !pq.dropped {
msgBackFailed := make([]*primitive.MessageExt, 0)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 29045a0..adc7b38 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -17,44 +17,409 @@ limitations under the License.
package consumer
-import "time"
+import (
+ "container/list"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "sync"
+ "sync/atomic"
+ "time"
+)
var (
- topicAndGroupConsumeOKTPS = &statsItemSet{statsName: "CONSUME_OK_TPS"}
- topicAndGroupConsumeRT = &statsItemSet{statsName: "CONSUME_FAILED_TPS"}
- topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
- topicAndGroupPullTPS = &statsItemSet{statsName: "PULL_TPS"}
- topicAndGroupPullRT = &statsItemSet{statsName: "PULL_RT"}
+ csListLock sync.Mutex
+
+ topicAndGroupConsumeOKTPS *statsItemSet
+ topicAndGroupConsumeRT *statsItemSet
+ topicAndGroupConsumeFailedTPS *statsItemSet
+ topicAndGroupPullTPS *statsItemSet
+ topicAndGroupPullRT *statsItemSet
)
-type statsItem struct {
+func init() {
+ topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
+ topicAndGroupConsumeRT = newStatsItemSet("CONSUME_FAILED_TPS")
+ topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_RT")
+ topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
+ topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+}
+
+type ConsumeStatus struct {
+ PullRT float64
+ PullTPS float64
+ ConsumeRT float64
+ ConsumeOKTPS float64
+ ConsumeFailedTPS float64
+ ConsumeFailedMsgs int64
+}
+
+func increasePullRT(group, topic string, rt int64) {
+ topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increasePullTPS(group, topic string, msgs int) {
+ topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeRT(group, topic string, rt int64) {
+ topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increaseConsumeOKTPS(group, topic string, msgs int) {
+ topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgs int) {
+ topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func GetConsumeStatus(group, topic string) ConsumeStatus {
+ cs := ConsumeStatus{}
+ ss := getPullRT(group, topic)
+ cs.PullTPS = ss.tps
+
+ ss = getPullTPS(group, topic)
+ cs.PullTPS = ss.tps
+
+ ss = getConsumeRT(group, topic)
+ cs.ConsumeRT = ss.avgpt
+
+ ss = getConsumeOKTPS(group, topic)
+ cs.ConsumeOKTPS = ss.tps
+
+ ss = getConsumeFailedTPS(group, topic)
+
+ cs.ConsumeFailedTPS = ss.tps
+
+ ss = topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
+ cs.ConsumeFailedMsgs = ss.sum
+ return cs
+}
+
+func ShutDownStatis() {
+ topicAndGroupConsumeOKTPS.closed = true
+ topicAndGroupConsumeRT.closed = true
+ topicAndGroupConsumeFailedTPS.closed = true
+ topicAndGroupPullTPS.closed = true
+ topicAndGroupPullRT.closed = true
+}
+
+func getPullRT(group, topic string) statsSnapshot {
+ return topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getPullTPS(group, topic string) statsSnapshot {
+ return topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getConsumeRT(group, topic string) statsSnapshot {
+ ss := topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+ if ss.sum == 0 {
+ return topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
+ }
+ return ss
+}
+
+func getConsumeOKTPS(group, topic string) statsSnapshot {
+ return topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getConsumeFailedTPS(group, topic string) statsSnapshot {
+ return topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
}
type statsItemSet struct {
statsName string
- statsItemTable map[string]statsItem
+ statsItemTable sync.Map
+ closed bool
+}
+
+func newStatsItemSet(statsName string) *statsItemSet {
+ sis := &statsItemSet{
+ statsName: statsName,
+ }
+ sis.init()
+ return sis
+}
+
+func (sis *statsItemSet) init() {
+ go func() {
+ for !sis.closed {
+ sis.samplingInSeconds()
+ time.Sleep(10 * time.Second)
+ }
+ }()
+
+ go func() {
+ for !sis.closed {
+ sis.samplingInMinutes()
+ time.Sleep(10 * time.Minute)
+ }
+ }()
+
+ go func() {
+ for !sis.closed {
+ sis.samplingInHour()
+ time.Sleep(time.Hour)
+ }
+ }()
+
+ go func() {
+ time.Sleep(nextMinutesTime().Sub(time.Now()))
+ for !sis.closed {
+ sis.printAtMinutes()
+ time.Sleep(time.Minute)
+ }
+ }()
+
+ go func() {
+ time.Sleep(nextHourTime().Sub(time.Now()))
+ for !sis.closed {
+ sis.printAtHour()
+ time.Sleep(time.Hour)
+ }
+ }()
+
+ go func() {
+ time.Sleep(nextMonthTime().Sub(time.Now()))
+ for !sis.closed {
+ sis.printAtDay()
+ time.Sleep(24 * time.Hour)
+ }
+ }()
+}
+
+
+func (sis *statsItemSet) samplingInSeconds() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.samplingInSeconds()
+ return true
+ })
+}
+
+func (sis *statsItemSet) samplingInMinutes() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.samplingInMinutes()
+ return true
+ })
+}
+
+func (sis *statsItemSet) samplingInHour() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.samplingInHour()
+ return true
+ })
+}
+
+func (sis *statsItemSet) printAtMinutes() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.printAtMinutes()
+ return true
+ })
+}
+
+func (sis *statsItemSet) printAtHour() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.printAtHour()
+ return true
+ })
}
-func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+func (sis *statsItemSet) printAtDay() {
+ sis.statsItemTable.Range(func(key, value interface{}) bool {
+ si := value.(*statsItem)
+ si.printAtDay()
+ return true
+ })
+}
+func (sis *statsItemSet) addValue(key string, incValue, incTimes int64) {
+ si := sis.getAndCreateStateItem(key)
+ atomic.AddInt64(&si.value, incValue)
+ atomic.AddInt64(&si.times, incTimes)
}
-func increasePullRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getAndCreateStateItem(key string) *statsItem {
+ if val, ok := sis.statsItemTable.Load(key); ok {
+ return val.(*statsItem)
+ } else {
+ si := newStatsItem(sis.statsName, key)
+ sis.statsItemTable.Store(key, si)
+ return si
+ }
+}
+func (sis *statsItemSet) getStatsDataInMinute(key string) statsSnapshot {
+ if val, ok := sis.statsItemTable.Load(key); ok {
+ si := val.(*statsItem)
+ return si.getStatsDataInMinute()
+ }
+ return statsSnapshot{}
}
-func increaseConsumeRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getStatsDataInHour(key string) statsSnapshot {
+ if val, ok := sis.statsItemTable.Load(key); ok {
+ si := val.(*statsItem)
+ return si.getStatsDataInHour()
+ }
+ return statsSnapshot{}
+}
+func (sis *statsItemSet) getStatsDataInDay(key string) statsSnapshot {
+ if val, ok := sis.statsItemTable.Load(key); ok {
+ si := val.(*statsItem)
+ return si.getStatsDataInDay()
+ }
+ return statsSnapshot{}
}
-func increasePullTPS(group, topic string, msgNumber int) {
+func (sis *statsItemSet) getStatsItem(key string) *statsItem {
+ val, _ := sis.statsItemTable.Load(key)
+ return val.(*statsItem)
+}
+type statsItem struct {
+ value int64
+ times int64
+ csListMinute *list.List
+ csListHour *list.List
+ csListDay *list.List
+ statsName string
+ statsKey string
+ csListMinuteLock sync.Mutex
+ csListHourLock sync.Mutex
+ csListDayLock sync.Mutex
}
-func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+func (si *statsItem) getStatsDataInMinute() statsSnapshot {
+ return computeStatsData(si.csListMinute)
+}
+
+func (si *statsItem) getStatsDataInHour() statsSnapshot {
+ return computeStatsData(si.csListHour)
+}
+
+func (si *statsItem) getStatsDataInDay() statsSnapshot {
+ return computeStatsData(si.csListDay)
+}
+func newStatsItem(statsName, statsKey string) *statsItem {
+ return &statsItem{
+ statsName: statsName,
+ statsKey: statsKey,
+ csListMinute: list.New(),
+ csListHour: list.New(),
+ csListDay: list.New(),
+ }
}
-func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+func (si *statsItem) samplingInSeconds() {
+ si.csListMinuteLock.Lock()
+ defer si.csListMinuteLock.Unlock()
+ si.csListMinute.PushBack(callSnapshot{
+ timestamp: time.Now().Unix() * 1000,
+ time: atomic.LoadInt64(&si.times),
+ value: atomic.LoadInt64(&si.value),
+ })
+ if si.csListMinute.Len() > 7 {
+ si.csListMinute.Remove(si.csListMinute.Front())
+ }
+}
+
+func (si *statsItem) samplingInMinutes() {
+ si.csListHourLock.Lock()
+ defer si.csListHourLock.Unlock()
+ si.csListHour.PushBack(callSnapshot{
+ timestamp: time.Now().Unix() * 1000,
+ time: atomic.LoadInt64(&si.times),
+ value: atomic.LoadInt64(&si.value),
+ })
+ if si.csListHour.Len() > 7 {
+ si.csListHour.Remove(si.csListHour.Front())
+ }
+}
+
+func (si *statsItem) samplingInHour() {
+ si.csListDayLock.Lock()
+ defer si.csListDayLock.Unlock()
+ si.csListDay.PushBack(callSnapshot{
+ timestamp: time.Now().Unix() * 1000,
+ time: atomic.LoadInt64(&si.times),
+ value: atomic.LoadInt64(&si.value),
+ })
+ if si.csListDay.Len() > 25 {
+ si.csListHour.Remove(si.csListDay.Front())
+ }
+}
+
+func (si *statsItem) printAtMinutes() {
+ ss := computeStatsData(si.csListMinute)
+ rlog.Infof("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+ si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtHour() {
+ ss := computeStatsData(si.csListHour)
+ rlog.Infof("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+ si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtDay() {
+ ss := computeStatsData(si.csListDay)
+ rlog.Infof("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+ si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func nextMinutesTime() time.Time {
+ now := time.Now()
+ m, _ := time.ParseDuration("1m")
+ return now.Add(m)
+}
+
+func nextHourTime() time.Time {
+ now := time.Now()
+ m, _ := time.ParseDuration("1h")
+ return now.Add(m)
+}
+
+func nextMonthTime() time.Time {
+ now := time.Now()
+ return now.AddDate(0, 1, 0)
+}
+
+func computeStatsData(csList *list.List) statsSnapshot {
+ csListLock.Lock()
+ defer csListLock.Unlock()
+ tps, avgpt, sum := 0.0, 0.0, int64(0)
+ if csList.Len() > 0 {
+ first := csList.Front().Value.(callSnapshot)
+ last := csList.Back().Value.(callSnapshot)
+ sum = last.value - first.value
+ tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
+ timesDiff := last.time - first.time
+ if timesDiff > 0 {
+ avgpt = float64(sum*1.0) / float64(timesDiff)
+ }
+ }
+ return statsSnapshot{
+ tps: tps,
+ avgpt: avgpt,
+ sum: sum,
+ }
+}
+
+type callSnapshot struct {
+ timestamp int64
+ time int64
+ value int64
+}
+type statsSnapshot struct {
+ sum int64
+ tps float64
+ avgpt float64
}