You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/14 06:42:03 UTC
[rocketmq-client-go] branch native updated: [ISSUE #148] add
consume/statistic.go unit test (#149)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new ae7a2cf [ISSUE #148] add consume/statistic.go unit test (#149)
ae7a2cf is described below
commit ae7a2cf084c9fc4aabfca50fcfc24a52083b52f9
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Wed Aug 14 14:41:58 2019 +0800
[ISSUE #148] add consume/statistic.go unit test (#149)
---
consumer/statistics.go | 4 +-
consumer/statistics_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 194 insertions(+), 2 deletions(-)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index b85e056..43f547a 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -38,8 +38,8 @@ var (
func init() {
topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
- topicAndGroupConsumeRT = newStatsItemSet("CONSUME_FAILED_TPS")
- topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_RT")
+ topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
+ topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
topicAndGroupPullRT = newStatsItemSet("PULL_RT")
}
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
new file mode 100644
index 0000000..370940f
--- /dev/null
+++ b/consumer/statistics_test.go
@@ -0,0 +1,192 @@
+package consumer
+
+import (
+ "testing"
+ "time"
+)
+
+func almostEqual(a, b float64) bool {
+ diff := abs(a - b)
+ return diff/a < 0.01
+}
+
+func abs(a float64) float64 {
+ if a > 0 {
+ return a
+ }
+ return -a
+}
+
+func TestNextMinuteTime(t *testing.T) {
+ nextMinute := nextMinutesTime()
+ minuteElapse := nextMinute.Sub(time.Now()).Minutes()
+ if !almostEqual(minuteElapse, 1.0) {
+ t.Errorf("wrong next one minute. want=%f, got=%f", 1.0, minuteElapse)
+ }
+}
+
+func TestNextHourTime(t *testing.T) {
+ nextHour := nextHourTime()
+ hourElapse := nextHour.Sub(time.Now()).Hours()
+ if !almostEqual(hourElapse, 1.0) {
+ t.Errorf("wrong next one hour. want=%f, got=%f", 1.0, hourElapse)
+ }
+}
+
+func TestIncreasePullRTGetPullRT(t *testing.T) {
+ ShutDownStatis()
+
+ tests := []struct {
+ RT int64
+ ExpectSum int64
+ }{
+ {1, 0},
+ {1, 1},
+ {1, 2},
+ {1, 3},
+ {1, 4},
+ {1, 5},
+ {1, 6},
+ {1, 6},
+ }
+ for _, tt := range tests {
+ increasePullRT("rocketmq", "default", tt.RT)
+ topicAndGroupPullRT.samplingInSeconds()
+ snapshot := getPullRT("rocketmq", "default")
+ if snapshot.sum != tt.ExpectSum {
+ t.Errorf("wrong Pull RT sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
+ }
+ }
+}
+
+//func TestIncreaseConsumeRTGetConsumeRT(t *testing.T) {
+// ShutDownStatis()
+// tests := []struct {
+// RT int64
+// ExpectSum int64
+// }{
+// {1, 0},
+// {1, 1},
+// {1, 2},
+// {1, 3},
+// {1, 4},
+// {1, 5},
+// {1, 6},
+// {1, 6},
+// }
+// for _, tt := range tests {
+// increaseConsumeRT("rocketmq", "default", tt.RT)
+// topicAndGroupConsumeRT.samplingInMinutes()
+// snapshot := getConsumeRT("rocketmq", "default")
+// if snapshot.sum != tt.ExpectSum {
+// t.Errorf("wrong consume RT sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
+// }
+// }
+//}
+
+func TestIncreasePullTPSGetPullTPS(t *testing.T) {
+ ShutDownStatis()
+ tests := []struct {
+ RT int
+ ExpectSum int64
+ }{
+ {1, 0},
+ {1, 1},
+ {1, 2},
+ {1, 3},
+ {1, 4},
+ {1, 5},
+ {1, 6},
+ {1, 6},
+ }
+ for _, tt := range tests {
+ increasePullTPS("rocketmq", "default", tt.RT)
+ topicAndGroupPullTPS.samplingInSeconds()
+ snapshot := getPullTPS("rocketmq", "default")
+ if snapshot.sum != tt.ExpectSum {
+ t.Errorf("wrong Pull TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
+ }
+ }
+}
+
+func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
+ ShutDownStatis()
+ tests := []struct {
+ RT int
+ ExpectSum int64
+ }{
+ {1, 0},
+ {1, 1},
+ {1, 2},
+ {1, 3},
+ {1, 4},
+ {1, 5},
+ {1, 6},
+ {1, 6},
+ }
+ for _, tt := range tests {
+ increaseConsumeOKTPS("rocketmq", "default", tt.RT)
+ topicAndGroupConsumeOKTPS.samplingInSeconds()
+ snapshot := getConsumeOKTPS("rocketmq", "default")
+ if snapshot.sum != tt.ExpectSum {
+ t.Errorf("wrong Consume OK TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
+ }
+ }
+}
+
+func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
+ ShutDownStatis()
+ tests := []struct {
+ RT int
+ ExpectSum int64
+ }{
+ {1, 0},
+ {1, 1},
+ {1, 2},
+ {1, 3},
+ {1, 4},
+ {1, 5},
+ {1, 6},
+ {1, 6},
+ }
+ for _, tt := range tests {
+ increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
+ topicAndGroupConsumeFailedTPS.samplingInSeconds()
+ snapshot := getConsumeFailedTPS("rocketmq", "default")
+ if snapshot.sum != tt.ExpectSum {
+ t.Errorf("wrong Consume Failed TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
+ }
+ }
+}
+
+func TestGetConsumeStatus(t *testing.T) {
+ ShutDownStatis()
+ group, topic := "rocketmq", "default"
+
+ tests := []struct {
+ RT int
+ ExpectFailMessage int64
+ }{
+ {1, 0},
+ {1, 1},
+ {1, 2},
+ {1, 3},
+ {1, 4},
+ }
+ for _, tt := range tests {
+ increasePullRT(group, topic, int64(tt.RT))
+ increasePullTPS(group, topic, tt.RT)
+ increaseConsumeRT(group, topic, int64(tt.RT))
+ increaseConsumeOKTPS(group, topic, tt.RT)
+ increaseConsumeFailedTPS(group, topic, tt.RT)
+ topicAndGroupPullRT.samplingInSeconds()
+ topicAndGroupPullTPS.samplingInSeconds()
+ topicAndGroupConsumeRT.samplingInMinutes()
+ topicAndGroupConsumeOKTPS.samplingInSeconds()
+ topicAndGroupConsumeFailedTPS.samplingInMinutes()
+ status := GetConsumeStatus(group, topic)
+ if status.ConsumeFailedMsgs != tt.ExpectFailMessage {
+ t.Errorf("wrong ConsumeFailedMsg. want=0, got=%d", status.ConsumeFailedMsgs)
+ }
+ }
+}