You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/09/16 03:48:21 UTC
[rocketmq-exporter] branch master updated: fix rocketmq-exporter
causes OOM Error (#65)
This is an automated email from the ASF dual-hosted git repository.
maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git
The following commit(s) were added to refs/heads/master by this push:
new c4277a5 fix rocketmq-exporter causes OOM Error (#65)
c4277a5 is described below
commit c4277a5145b98d45a025afaba0473852344a41ff
Author: humkum <50...@users.noreply.github.com>
AuthorDate: Thu Sep 16 11:48:17 2021 +0800
fix rocketmq-exporter causes OOM Error (#65)
Co-authored-by: hankunming <ha...@xiaomi.com>
---
.../exporter/collector/RMQMetricsCollector.java | 1170 +++++++++++---------
.../rocketmq/exporter/config/RMQConfigure.java | 10 +
.../service/impl/RMQMetricsServiceImpl.java | 10 +-
.../rocketmq/exporter/task/MetricsCollectTask.java | 4 -
src/main/resources/application.yml | 1 +
5 files changed, 643 insertions(+), 552 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index e6f7bc9..4463111 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -16,8 +16,11 @@
*/
package org.apache.rocketmq.exporter.collector;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
@@ -41,179 +44,270 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
public class RMQMetricsCollector extends Collector {
- //max offset of normal consume queue
- private ConcurrentHashMap<ProducerMetric, Double> topicOffset = new ConcurrentHashMap<>();
+
+ private Cache<ProducerMetric, Double> topicOffset;
//max offset of retry topic consume queue
- private ConcurrentHashMap<ProducerMetric, Double> topicRetryOffset = new ConcurrentHashMap<>();
+ private Cache<ProducerMetric, Double> topicRetryOffset;
//max offset of dlq consume queue
- private ConcurrentHashMap<DLQTopicOffsetMetric, Double> topicDLQOffset = new ConcurrentHashMap<>();
+ private Cache<DLQTopicOffsetMetric, Double> topicDLQOffset;
//total put numbers for topics
- private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutNums = new ConcurrentHashMap<>();
+ private Cache<TopicPutNumMetric, Double> topicPutNums;
//total get numbers for topics
- private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutSize = new ConcurrentHashMap<>();
+ private Cache<TopicPutNumMetric, Double> topicPutSize;
//diff for consumer group
- private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDiff = new ConcurrentHashMap<>();
+ private Cache<ConsumerTopicDiffMetric, Long> consumerDiff;
//retry diff for consumer group
- private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerRetryDiff = new ConcurrentHashMap<>();
+ private Cache<ConsumerTopicDiffMetric, Long> consumerRetryDiff;
//dlq diff for consumer group
- private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDLQDiff = new ConcurrentHashMap<>();
+ private Cache<ConsumerTopicDiffMetric, Long> consumerDLQDiff;
//consumer count
- private ConcurrentHashMap<ConsumerCountMetric, Integer> consumerCounts = new ConcurrentHashMap<>();
+ private Cache<ConsumerCountMetric, Integer> consumerCounts;
//count of consume fail
- private ConcurrentHashMap<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts;
//TPS of consume fail
- private ConcurrentHashMap<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS;
//TPS of consume success
- private ConcurrentHashMap<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS;
//rt of consume
- private ConcurrentHashMap<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT;
//pull RT
- private ConcurrentHashMap<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT;
//pull tps
- private ConcurrentHashMap<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS = new ConcurrentHashMap<>();
+ private Cache<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS;
//broker offset for consumer-topic
- private ConcurrentHashMap<ConsumerMetric, Long> groupBrokerTotalOffset = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Long> groupBrokerTotalOffset;
//consumer offset for consumer-topic
- private ConcurrentHashMap<ConsumerMetric, Long> groupConsumeTotalOffset = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Long> groupConsumeTotalOffset;
//consume tps
- private ConcurrentHashMap<ConsumerMetric, Double> groupConsumeTPS = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Double> groupConsumeTPS;
//consumed message count for consumer-topic
- private ConcurrentHashMap<ConsumerMetric, Double> groupGetNums = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Double> groupGetNums;
//consumed message size(byte) for consumer-topic
- private ConcurrentHashMap<ConsumerMetric, Double> groupGetSize = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Double> groupGetSize;
//re-consumed message count for consumer-topic
- private ConcurrentHashMap<ConsumerMetric, Long> sendBackNums = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Double> sendBackNums;
// group latency time
- private ConcurrentHashMap<ConsumerMetric, Long> groupLatencyByTime = new ConcurrentHashMap<>();
+ private Cache<ConsumerMetric, Long> groupLatencyByTime;
//total put message count for one broker
- private ConcurrentHashMap<BrokerMetric, Double> brokerPutNums = new ConcurrentHashMap<>();
+ private Cache<BrokerMetric, Double> brokerPutNums;
//total get message count for one broker
- private ConcurrentHashMap<BrokerMetric, Double> brokerGetNums = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps600 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps60 = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps10 = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal = new ConcurrentHashMap<>();
-
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush = new ConcurrentHashMap<>();
+ private Cache<BrokerMetric, Double> brokerGetNums;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal;
+
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio;
+
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps600;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps60;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps10;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer;
+
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms;
+ private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal;
+
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset;
+ private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush;
+
+ public RMQMetricsCollector(long outOfTimeSeconds) {
+ this.topicOffset = initCache(outOfTimeSeconds);
+ this.topicRetryOffset = initCache(outOfTimeSeconds);
+ this.topicDLQOffset = initCache(outOfTimeSeconds);
+ this.topicPutNums = initCache(outOfTimeSeconds);
+ this.topicPutSize = initCache(outOfTimeSeconds);
+ this.consumerDiff = initCache(outOfTimeSeconds);
+ this.consumerRetryDiff = initCache(outOfTimeSeconds);
+ this.consumerDLQDiff = initCache(outOfTimeSeconds);
+ this.consumerCounts = initCache(outOfTimeSeconds);
+ this.consumerClientFailedMsgCounts = initCache(outOfTimeSeconds);
+ this.consumerClientFailedTPS = initCache(outOfTimeSeconds);
+ this.consumerClientOKTPS = initCache(outOfTimeSeconds);
+ this.consumerClientRT = initCache(outOfTimeSeconds);
+ this.consumerClientPullRT = initCache(outOfTimeSeconds);
+ this.consumerClientPullTPS = initCache(outOfTimeSeconds);
+ this.groupBrokerTotalOffset = initCache(outOfTimeSeconds);
+ this.groupConsumeTotalOffset = initCache(outOfTimeSeconds);
+ this.groupConsumeTPS = initCache(outOfTimeSeconds);
+ this.groupGetNums = initCache(outOfTimeSeconds);
+ this.groupGetSize = initCache(outOfTimeSeconds);
+ this.sendBackNums = initCache(outOfTimeSeconds);
+ this.groupLatencyByTime = initCache(outOfTimeSeconds);
+ this.brokerPutNums = initCache(outOfTimeSeconds);
+ this.brokerGetNums = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgPutTotalTodayNow = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgGetTotalTodayNow = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgGetTotalYesterdayMorning = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgPutTotalYesterdayMorning = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgGetTotalTodayMorning = initCache(outOfTimeSeconds);
+ this.brokerRuntimeMsgPutTotalTodayMorning = initCache(outOfTimeSeconds);
+ this.brokerRuntimeDispatchBehindBytes = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageSizeTotal = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageAverageSize = initCache(outOfTimeSeconds);
+ this.brokerRuntimeQueryThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+ this.brokerRuntimeRemainTransientStoreBufferNumbs = initCache(outOfTimeSeconds);
+ this.brokerRuntimeEarliestMessageTimeStamp = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageEntireTimeMax = initCache(outOfTimeSeconds);
+ this.brokerRuntimeStartAcceptSendRequestTimeStamp = initCache(outOfTimeSeconds);
+ this.brokerRuntimeSendThreadPoolQueueSize = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageTimesTotal = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetMessageEntireTimeMax = initCache(outOfTimeSeconds);
+ this.brokerRuntimePageCacheLockTimeMills = initCache(outOfTimeSeconds);
+ this.brokerRuntimeCommitLogDiskRatio = initCache(outOfTimeSeconds);
+ this.brokerRuntimeConsumeQueueDiskRatio = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetFoundTps600 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetFoundTps60 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetFoundTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTotalTps600 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTotalTps60 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTotalTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTransferedTps600 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTransferedTps60 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetTransferedTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetMissTps600 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetMissTps60 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeGetMissTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutTps600 = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutTps60 = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimeDispatchMaxBuffer = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap10toMore = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap5to10s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap4to5s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap3to4s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap2to3s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap1to2s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap500to1s = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap200to500ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap100to200ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap50to100ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap10to50ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap0to10ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutMessageDistributeTimeMap0ms = initCache(outOfTimeSeconds);
+ this.brokerRuntimePullThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+ this.brokerRuntimeSendThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
+ this.brokerRuntimePullThreadPoolQueueSize = initCache(outOfTimeSeconds);
+ this.brokerRuntimeQueryThreadPoolQueueSize = initCache(outOfTimeSeconds);
+ this.brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+ this.brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+ this.brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
+ this.brokerRuntimeCommitLogDirCapacityFree = initCache(outOfTimeSeconds);
+ this.brokerRuntimeCommitLogDirCapacityTotal = initCache(outOfTimeSeconds);
+ this.brokerRuntimeCommitLogMaxOffset = initCache(outOfTimeSeconds);
+ this.brokerRuntimeCommitLogMinOffset = initCache(outOfTimeSeconds);
+ this.brokerRuntimeRemainHowManyDataToFlush = initCache(outOfTimeSeconds);
+ }
+
+ private <T extends Cache> T initCache(long outOfTimeSeconds) {
+ return (T) CacheBuilder.newBuilder().expireAfterWrite(outOfTimeSeconds, TimeUnit.SECONDS).build();
+ }
+
private final static Logger log = LoggerFactory.getLogger(RMQMetricsCollector.class);
private static final List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers", "msgModel");
private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
family.addMetric(
- Arrays.asList(
- entry.getKey().getGroup(),
- entry.getKey().getTopic(),
- entry.getKey().getCountOfOnlineConsumers(),
- entry.getKey().getMsgModel()
- ),
- entry.getValue().doubleValue());
+ Arrays.asList(
+ entry.getKey().getGroup(),
+ entry.getKey().getTopic(),
+ entry.getKey().getCountOfOnlineConsumers(),
+ entry.getKey().getMsgModel()
+ ),
+ entry.getValue().doubleValue());
}
private static final List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("caddr", "localaddr", "group");
private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
- for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.entrySet()) {
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.asMap().entrySet()) {
loadGroupDiffMetric(groupGetLatencyByConsumerDiff, entry);
}
mfs.add(groupGetLatencyByConsumerDiff);
GaugeMetricFamily groupGetLatencyByConsumerRetryDiff = new GaugeMetricFamily("rocketmq_group_retrydiff", "GroupRetryDiff", GROUP_DIFF_LABEL_NAMES);
- for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.entrySet()) {
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.asMap().entrySet()) {
loadGroupDiffMetric(groupGetLatencyByConsumerRetryDiff, entry);
}
mfs.add(groupGetLatencyByConsumerRetryDiff);
GaugeMetricFamily groupGetLatencyByConsumerDLQDiff = new GaugeMetricFamily("rocketmq_group_dlqdiff", "GroupDLQDiff", GROUP_DIFF_LABEL_NAMES);
- for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.entrySet()) {
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.asMap().entrySet()) {
loadGroupDiffMetric(groupGetLatencyByConsumerDLQDiff, entry);
}
mfs.add(groupGetLatencyByConsumerDLQDiff);
GaugeMetricFamily consumerCountsF = new GaugeMetricFamily("rocketmq_group_count", "GroupCount", GROUP_COUNT_LABEL_NAMES);
- for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.entrySet()) {
+ for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.asMap().entrySet()) {
consumerCountsF.addMetric(
- Arrays.asList(
- entry.getKey().getCaddrs(),
- entry.getKey().getLocaladdrs(),
- entry.getKey().getGroup()
- ),
- entry.getValue().doubleValue());
+ Arrays.asList(
+ entry.getKey().getCaddrs(),
+ entry.getKey().getLocaladdrs(),
+ entry.getKey().getGroup()
+ ),
+ entry.getValue().doubleValue());
}
mfs.add(consumerCountsF);
@@ -221,47 +315,47 @@ public class RMQMetricsCollector extends Collector {
private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "topic", "lastUpdateTimestamp"
+ "cluster", "broker", "topic", "lastUpdateTimestamp"
);
private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "group", "lastUpdateTimestamp"
+ "cluster", "broker", "group", "lastUpdateTimestamp"
);
private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
family.addMetric(
- Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerName(),
- entry.getKey().getTopicName(),
- String.valueOf(entry.getKey().getLastUpdateTimestamp())
- ),
- entry.getValue());
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerName(),
+ entry.getKey().getTopicName(),
+ String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ ),
+ entry.getValue());
}
private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
- for (Map.Entry<ProducerMetric, Double> entry : topicOffset.entrySet()) {
+ for (Map.Entry<ProducerMetric, Double> entry : topicOffset.asMap().entrySet()) {
loadTopicOffsetMetric(topicOffsetF, entry);
}
mfs.add(topicOffsetF);
GaugeMetricFamily topicRetryOffsetF = new GaugeMetricFamily("rocketmq_topic_retry_offset", "TopicRetryOffset", TOPIC_OFFSET_LABEL_NAMES);
- for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.entrySet()) {
+ for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.asMap().entrySet()) {
loadTopicOffsetMetric(topicRetryOffsetF, entry);
}
mfs.add(topicRetryOffsetF);
GaugeMetricFamily topicDLQOffsetF = new GaugeMetricFamily("rocketmq_topic_dlq_offset", "TopicRetryOffset", DLQ_TOPIC_OFFSET_LABEL_NAMES);
- for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.entrySet()) {
+ for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.asMap().entrySet()) {
topicDLQOffsetF.addMetric(
- Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerName(),
- entry.getKey().getGroup(),
- String.valueOf(entry.getKey().getLastUpdateTimestamp())
- ),
- entry.getValue());
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerName(),
+ entry.getKey().getGroup(),
+ String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ ),
+ entry.getValue());
}
mfs.add(topicDLQOffsetF);
@@ -290,42 +384,42 @@ public class RMQMetricsCollector extends Collector {
}
private static final List<String> GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
- "clientAddr", "clientId", "group", "topic"
+ "clientAddr", "clientId", "group", "topic"
);
private void collectClientGroupMetric(List<MetricFamilySamples> mfs) {
GaugeMetricFamily consumerClientFailedMsgCountsF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_count", "consumerClientFailedMsgCounts", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.entrySet()) {
+ for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientFailedMsgCountsF, entry);
}
mfs.add(consumerClientFailedMsgCountsF);
GaugeMetricFamily consumerClientFailedTPSF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_tps", "consumerClientFailedTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.entrySet()) {
+ for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientFailedTPSF, entry);
}
mfs.add(consumerClientFailedTPSF);
GaugeMetricFamily consumerClientOKTPSF = new GaugeMetricFamily("rocketmq_client_consume_ok_msg_tps", "consumerClientOKTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.entrySet()) {
+ for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientOKTPSF, entry);
}
mfs.add(consumerClientOKTPSF);
GaugeMetricFamily consumerClientRTF = new GaugeMetricFamily("rocketmq_client_consume_rt", "consumerClientRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.entrySet()) {
+ for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientRTF, entry);
}
mfs.add(consumerClientRTF);
GaugeMetricFamily consumerClientPullRTF = new GaugeMetricFamily("rocketmq_client_consumer_pull_rt", "consumerClientPullRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.entrySet()) {
+ for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientPullRTF, entry);
}
mfs.add(consumerClientPullRTF);
GaugeMetricFamily consumerClientPullTPSF = new GaugeMetricFamily("rocketmq_client_consumer_pull_tps", "consumerClientPullTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
- for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.entrySet()) {
+ for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.asMap().entrySet()) {
loadClientRuntimeStatsMetric(consumerClientPullTPSF, entry);
}
mfs.add(consumerClientPullTPSF);
@@ -334,40 +428,40 @@ public class RMQMetricsCollector extends Collector {
private <T2 extends Number, T1 extends ConsumerRuntimeConsumeFailedMsgsMetric> void loadClientRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<T1, T2> entry) {
family.addMetric(Arrays.asList(
- entry.getKey().getCaddrs(),
- entry.getKey().getLocaladdrs(),
- entry.getKey().getGroup(),
- entry.getKey().getTopic()
+ entry.getKey().getCaddrs(),
+ entry.getKey().getLocaladdrs(),
+ entry.getKey().getGroup(),
+ entry.getKey().getTopic()
), entry.getValue().doubleValue());
}
private static final List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "topic", "group", "queueid"
+ "cluster", "broker", "topic", "group", "queueid"
);
private static final List<String> GROUP_LATENCY_BY_STORETIME_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "topic", "group"
+ "cluster", "broker", "topic", "group"
);
private static final List<String> BROKER_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "broker");
private static void loadBrokerNums(GaugeMetricFamily family, Map.Entry<BrokerMetric, Double> entry) {
family.addMetric(Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerIP(),
- entry.getKey().getBrokerName()),
- entry.getValue()
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerIP(),
+ entry.getKey().getBrokerName()),
+ entry.getValue()
);
}
private void collectBrokerNums(List<MetricFamilySamples> mfs) {
GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
- for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.entrySet()) {
+ for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.asMap().entrySet()) {
loadBrokerNums(brokerPutNumsGauge, entry);
}
mfs.add(brokerPutNumsGauge);
GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
- for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.entrySet()) {
+ for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.asMap().entrySet()) {
loadBrokerNums(brokerGetNumsGauge, entry);
}
mfs.add(brokerGetNumsGauge);
@@ -375,94 +469,94 @@ public class RMQMetricsCollector extends Collector {
private static final List<String> GROUP_NUMS_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "topic", "group"
+ "cluster", "broker", "topic", "group"
);
private static <T extends Number> void loadGroupNumsMetric(GaugeMetricFamily family, Map.Entry<ConsumerMetric, T> entry) {
family.addMetric(Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerName(),
- entry.getKey().getTopicName(),
- entry.getKey().getConsumerGroupName()),
- entry.getValue().doubleValue()
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerName(),
+ entry.getKey().getTopicName(),
+ entry.getKey().getConsumerGroupName()),
+ entry.getValue().doubleValue()
);
}
private void collectBrokerRuntimeStatsPutMessageDistributeTime(List<MetricFamilySamples> mfs) {
GaugeMetricFamily pmdt0 = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0ms", "PutMessageDistributeTimeMap0ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt0, entry);
}
mfs.add(pmdt0);
GaugeMetricFamily pmdt0to10ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0to10ms", "PutMessageDistributeTimeMap0to10ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt0to10ms, entry);
}
mfs.add(pmdt0to10ms);
GaugeMetricFamily pmdt10to50ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10to50ms", "PutMessageDistributeTimeMap10to50ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt10to50ms, entry);
}
mfs.add(pmdt10to50ms);
GaugeMetricFamily pmdt50to100ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_50to100ms", "PutMessageDistributeTimeMap50to100ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt50to100ms, entry);
}
mfs.add(pmdt50to100ms);
GaugeMetricFamily pmdt100to200ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_100to200ms", "PutMessageDistributeTimeMap100to200ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt100to200ms, entry);
}
mfs.add(pmdt100to200ms);
GaugeMetricFamily pmdt200to500ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_200to500ms", "PutMessageDistributeTimeMap200to500ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt200to500ms, entry);
}
mfs.add(pmdt200to500ms);
GaugeMetricFamily pmdt500to1s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_500to1s", "PutMessageDistributeTimeMap500to1s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt500to1s, entry);
}
mfs.add(pmdt500to1s);
GaugeMetricFamily pmdt1to2s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_1to2s", "PutMessageDistributeTimeMap1to2s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt1to2s, entry);
}
mfs.add(pmdt1to2s);
GaugeMetricFamily pmdt2to3s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_2to3s", "PutMessageDistributeTimeMap2to3s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt2to3s, entry);
}
mfs.add(pmdt2to3s);
GaugeMetricFamily pmdt3to4s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_3to4s", "PutMessageDistributeTimeMap3to4s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt3to4s, entry);
}
mfs.add(pmdt3to4s);
GaugeMetricFamily pmdt4to5s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_4to5s", "PutMessageDistributeTimeMap4to5s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt4to5s, entry);
}
mfs.add(pmdt4to5s);
GaugeMetricFamily pmdt5to10s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_5to10s", "PutMessageDistributeTimeMap5to10s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt5to10s, entry);
}
mfs.add(pmdt5to10s);
GaugeMetricFamily pmdt10stoMore = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10stomore", "PutMessageDistributeTimeMap10toMore", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(pmdt10stoMore, entry);
}
mfs.add(pmdt10stoMore);
@@ -470,44 +564,44 @@ public class RMQMetricsCollector extends Collector {
private void collectGroupNums(List<MetricFamilySamples> mfs) {
GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.asMap().entrySet()) {
loadGroupNumsMetric(groupGetNumsGauge, entry);
}
mfs.add(groupGetNumsGauge);
GaugeMetricFamily groupConsumeTPSF = new GaugeMetricFamily("rocketmq_group_consume_tps", "GroupConsumeTPS", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.asMap().entrySet()) {
loadGroupNumsMetric(groupConsumeTPSF, entry);
}
mfs.add(groupConsumeTPSF);
GaugeMetricFamily groupBrokerTotalOffsetF = new GaugeMetricFamily("rocketmq_consumer_offset", "GroupBrokerTotalOffset", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.asMap().entrySet()) {
loadGroupNumsMetric(groupBrokerTotalOffsetF, entry);
}
mfs.add(groupBrokerTotalOffsetF);
GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.asMap().entrySet()) {
loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
}
mfs.add(groupConsumeTotalOffsetF);
GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_message_size", "GroupGetMessageSize", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.asMap().entrySet()) {
loadGroupNumsMetric(groupGetSizeGauge, entry);
}
mfs.add(groupGetSizeGauge);
GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", GROUP_NUMS_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Long> entry : sendBackNums.entrySet()) {
+ for (Map.Entry<ConsumerMetric, Double> entry : sendBackNums.asMap().entrySet()) {
loadGroupNumsMetric(sendBackNumsGauge, entry);
}
mfs.add(sendBackNumsGauge);
GaugeMetricFamily groupLatencyByTimeF = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime",
- "GroupGetLatencyByStoreTime", GROUP_LATENCY_BY_STORETIME_LABEL_NAMES);
- for (Map.Entry<ConsumerMetric, Long> entry : groupLatencyByTime.entrySet()) {
+ "GroupGetLatencyByStoreTime", GROUP_LATENCY_BY_STORETIME_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Long> entry : groupLatencyByTime.asMap().entrySet()) {
loadGroupNumsMetric(groupLatencyByTimeF, entry);
}
mfs.add(groupLatencyByTimeF);
@@ -516,13 +610,13 @@ public class RMQMetricsCollector extends Collector {
private void collectTopicNums(List<MetricFamilySamples> mfs) {
GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
- for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.entrySet()) {
+ for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.asMap().entrySet()) {
loadTopicNumsMetric(topicPutNumsGauge, entry);
}
mfs.add(topicPutNumsGauge);
GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
- for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.entrySet()) {
+ for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.asMap().entrySet()) {
loadTopicNumsMetric(topicPutSizeGauge, entry);
}
mfs.add(topicPutSizeGauge);
@@ -532,12 +626,12 @@ public class RMQMetricsCollector extends Collector {
private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
family.addMetric(
- Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerName(),
- entry.getKey().getTopicName()
- ),
- entry.getValue()
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerName(),
+ entry.getKey().getTopicName()
+ ),
+ entry.getValue()
);
}
@@ -597,7 +691,7 @@ public class RMQMetricsCollector extends Collector {
groupGetSize.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
}
- public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, long value) {
+ public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
sendBackNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
}
@@ -629,403 +723,387 @@ public class RMQMetricsCollector extends Collector {
brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
- public void clearBrokerPutNumsMetric(Set<BrokerMetric> masterMetricsSet) {
- for (BrokerMetric brokerMetric : brokerPutNums.keySet()) {
- if (!masterMetricsSet.contains(brokerMetric)) {
- brokerPutNums.remove(brokerMetric);
- }
- }
- }
-
public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
- public void clearBrokerGetNumsMetric(Set<BrokerMetric> masterMetricsSet) {
- for (BrokerMetric brokerMetric : brokerGetNums.keySet()) {
- if (!masterMetricsSet.contains(brokerMetric)) {
- brokerGetNums.remove(brokerMetric);
- }
- }
- }
-
public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
addBrokerRuntimePutMessageDistributeTimeMap(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion(), stats);
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion(), stats);
addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
brokerRuntimeMsgGetTotalTodayNow.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
brokerRuntimeMsgPutTotalTodayMorning.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
brokerRuntimeMsgGetTotalTodayMorning.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
brokerRuntimeMsgPutTotalYesterdayMorning.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
brokerRuntimeMsgGetTotalYesterdayMorning.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
brokerRuntimeQueryThreadPoolQueueSize.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
brokerRuntimePullThreadPoolQueueSize.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
brokerRuntimeSendThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
brokerRuntimePullThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
brokerRuntimeRemainHowManyDataToFlush.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
brokerRuntimeCommitLogMinOffset.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getCommitLogMinOffset());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogMinOffset());
brokerRuntimeCommitLogMaxOffset.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
brokerRuntimeDispatchMaxBuffer.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
brokerRuntimeConsumeQueueDiskRatio.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
brokerRuntimeCommitLogDiskRatio.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
brokerRuntimePageCacheLockTimeMills.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
brokerRuntimeGetMessageEntireTimeMax.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
brokerRuntimePutMessageTimesTotal.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
brokerRuntimeSendThreadPoolQueueSize.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
brokerRuntimeStartAcceptSendRequestTimeStamp.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
brokerRuntimePutMessageEntireTimeMax.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
brokerRuntimeEarliestMessageTimeStamp.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
brokerRuntimeRemainTransientStoreBufferNumbs.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
brokerRuntimeQueryThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
brokerRuntimePutMessageAverageSize.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutMessageAverageSize());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageAverageSize());
brokerRuntimePutMessageSizeTotal.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutMessageSizeTotal());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
brokerRuntimeDispatchBehindBytes.put(new BrokerRuntimeMetric(
- clusterName, brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getDispatchBehindBytes());
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getDispatchBehindBytes());
}
private void addAllKindOfTps(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
brokerRuntimePutTps10.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutTps().getTen());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getTen());
brokerRuntimePutTps60.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutTps().getSixty());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getSixty());
brokerRuntimePutTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
brokerRuntimeGetMissTps10.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetMissTps().getTen());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getTen());
brokerRuntimeGetMissTps60.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
brokerRuntimeGetMissTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
brokerRuntimeGetTransferedTps10.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
brokerRuntimeGetTransferedTps60.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
brokerRuntimeGetTransferedTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
brokerRuntimeGetTotalTps10.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
brokerRuntimeGetTotalTps60.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
brokerRuntimeGetTotalTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
brokerRuntimeGetFoundTps10.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
brokerRuntimeGetFoundTps60.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
}
private void addCommitLogDirCapacity(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
brokerRuntimeCommitLogDirCapacityTotal.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
brokerRuntimeCommitLogDirCapacityFree.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- stats.getBrokerVersionDesc(),
- stats.getBootTimestamp(),
- stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
}
private void addBrokerRuntimePutMessageDistributeTimeMap(
- String clusterName, String brokerAddress, String brokerHost,
- String brokerDes, long bootTimestamp, int brokerVersion,
- BrokerRuntimeStats stats) {
+ String clusterName, String brokerAddress, String brokerHost,
+ String brokerDes, long bootTimestamp, int brokerVersion,
+ BrokerRuntimeStats stats) {
brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
brokerRuntimePutMessageDistributeTimeMap0to10ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
brokerRuntimePutMessageDistributeTimeMap10to50ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
brokerRuntimePutMessageDistributeTimeMap50to100ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
brokerRuntimePutMessageDistributeTimeMap100to200ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
brokerRuntimePutMessageDistributeTimeMap200to500ms.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
brokerRuntimePutMessageDistributeTimeMap500to1s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
brokerRuntimePutMessageDistributeTimeMap1to2s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
brokerRuntimePutMessageDistributeTimeMap2to3s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
brokerRuntimePutMessageDistributeTimeMap3to4s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
brokerRuntimePutMessageDistributeTimeMap4to5s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
brokerRuntimePutMessageDistributeTimeMap5to10s.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
brokerRuntimePutMessageDistributeTimeMap10toMore.put(new BrokerRuntimeMetric(
- clusterName,
- brokerAddress, brokerHost,
- brokerDes,
- bootTimestamp,
- brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
}
private static <T extends Number> void loadBrokerRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<BrokerRuntimeMetric, T> entry) {
family.addMetric(Arrays.asList(
- entry.getKey().getClusterName(),
- entry.getKey().getBrokerAddress(),
- entry.getKey().getBrokerHost(),
- entry.getKey().getBrokerDes(),
- String.valueOf(entry.getKey().getBootTimestamp()),
- String.valueOf(entry.getKey().getBrokerVersion())
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerAddress(),
+ entry.getKey().getBrokerHost(),
+ entry.getKey().getBrokerDes(),
+ String.valueOf(entry.getKey().getBootTimestamp()),
+ String.valueOf(entry.getKey().getBrokerVersion())
), entry.getValue().doubleValue());
}
@@ -1035,291 +1113,291 @@ public class RMQMetricsCollector extends Collector {
collectBrokerRuntimeStatsPutMessageDistributeTime(mfs);
GaugeMetricFamily brokerRuntimeMsgPutTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_put_total_today_now", "brokerRuntimeMsgPutTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayNowF, entry);
}
mfs.add(brokerRuntimeMsgPutTotalTodayNowF);
GaugeMetricFamily brokerRuntimeMsgGetTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_today_now", "brokerRuntimeMsgGetTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayNowF, entry);
}
mfs.add(brokerRuntimeMsgGetTotalTodayNowF);
GaugeMetricFamily brokerRuntimeDispatchBehindBytesF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_behind_bytes", "brokerRuntimeDispatchBehindBytes", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchBehindBytesF, entry);
}
mfs.add(brokerRuntimeDispatchBehindBytesF);
GaugeMetricFamily brokerRuntimePutMessageSizeTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_size_total", "brokerRuntimePutMessageSizeTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageSizeTotalF, entry);
}
mfs.add(brokerRuntimePutMessageSizeTotalF);
GaugeMetricFamily brokerRuntimePutMessageAverageSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_average_size", "brokerRuntimePutMessageAverageSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageAverageSizeF, entry);
}
mfs.add(brokerRuntimePutMessageAverageSizeF);
GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpool_queue_capacity", "brokerRuntimeQueryThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueCapacityF, entry);
}
mfs.add(brokerRuntimeQueryThreadPoolQueueCapacityF);
GaugeMetricFamily brokerRuntimeRemainTransientStoreBufferNumbsF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_transientstore_buffer_numbs", "brokerRuntimeRemainTransientStoreBufferNumbs", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeRemainTransientStoreBufferNumbsF, entry);
}
mfs.add(brokerRuntimeRemainTransientStoreBufferNumbsF);
GaugeMetricFamily brokerRuntimeEarliestMessageTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_earliest_message_timestamp", "brokerRuntimeEarliestMessageTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeEarliestMessageTimeStampF, entry);
}
mfs.add(brokerRuntimeEarliestMessageTimeStampF);
GaugeMetricFamily brokerRuntimePutMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_entire_time_max", "brokerRuntimePutMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageEntireTimeMaxF, entry);
}
mfs.add(brokerRuntimePutMessageEntireTimeMaxF);
GaugeMetricFamily brokerRuntimeStartAcceptSendRequestTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_start_accept_sendrequest_time", "brokerRuntimeStartAcceptSendRequestTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeStartAcceptSendRequestTimeStampF, entry);
}
mfs.add(brokerRuntimeStartAcceptSendRequestTimeStampF);
GaugeMetricFamily brokerRuntimeSendThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpool_queue_size", "brokerRuntimeSendThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueSizeF, entry);
}
mfs.add(brokerRuntimeSendThreadPoolQueueSizeF);
GaugeMetricFamily brokerRuntimePutMessageTimesTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_times_total", "brokerRuntimePutMessageTimesTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageTimesTotalF, entry);
}
mfs.add(brokerRuntimePutMessageTimesTotalF);
GaugeMetricFamily brokerRuntimeGetMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_getmessage_entire_time_max", "brokerRuntimeGetMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetMessageEntireTimeMaxF, entry);
}
mfs.add(brokerRuntimeGetMessageEntireTimeMaxF);
GaugeMetricFamily brokerRuntimePageCacheLockTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pagecache_lock_time_mills", "brokerRuntimePageCacheLockTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePageCacheLockTimeMillsF, entry);
}
mfs.add(brokerRuntimePageCacheLockTimeMillsF);
GaugeMetricFamily brokerRuntimeCommitLogDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_disk_ratio", "brokerRuntimeCommitLogDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDiskRatioF, entry);
}
mfs.add(brokerRuntimeCommitLogDiskRatioF);
GaugeMetricFamily brokerRuntimeConsumeQueueDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_consumequeue_disk_ratio", "brokerRuntimeConsumeQueueDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeConsumeQueueDiskRatioF, entry);
}
mfs.add(brokerRuntimeConsumeQueueDiskRatioF);
GaugeMetricFamily brokerRuntimeGetFoundTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps600", "brokerRuntimeGetFoundTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps600F, entry);
}
mfs.add(brokerRuntimeGetFoundTps600F);
GaugeMetricFamily brokerRuntimeGetFoundTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps60", "brokerRuntimeGetFoundTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps60F, entry);
}
mfs.add(brokerRuntimeGetFoundTps60F);
GaugeMetricFamily brokerRuntimeGetFoundTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps10", "brokerRuntimeGetFoundTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps10F, entry);
}
mfs.add(brokerRuntimeGetFoundTps10F);
GaugeMetricFamily brokerRuntimeGetTotalTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps600", "brokerRuntimeGetTotalTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps600F, entry);
}
mfs.add(brokerRuntimeGetTotalTps600F);
GaugeMetricFamily brokerRuntimeGetTotalTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps60", "brokerRuntimeGetTotalTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps60F, entry);
}
mfs.add(brokerRuntimeGetTotalTps60F);
GaugeMetricFamily brokerRuntimeGetTotalTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps10", "brokerRuntimeGetTotalTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps10F, entry);
}
mfs.add(brokerRuntimeGetTotalTps10F);
GaugeMetricFamily brokerRuntimeGetTransferedTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps600", "brokerRuntimeGetTransferedTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps600F, entry);
}
mfs.add(brokerRuntimeGetTransferedTps600F);
GaugeMetricFamily brokerRuntimeGetTransferedTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps60", "brokerRuntimeGetTransferedTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps60F, entry);
}
mfs.add(brokerRuntimeGetTransferedTps60F);
GaugeMetricFamily brokerRuntimeGetTransferedTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps10", "brokerRuntimeGetTransferedTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps10F, entry);
}
mfs.add(brokerRuntimeGetTransferedTps10F);
GaugeMetricFamily brokerRuntimeGetMissTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps600", "brokerRuntimeGetMissTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps600F, entry);
}
mfs.add(brokerRuntimeGetMissTps600F);
GaugeMetricFamily brokerRuntimeGetMissTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps60", "brokerRuntimeGetMissTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps60F, entry);
}
mfs.add(brokerRuntimeGetMissTps60F);
GaugeMetricFamily brokerRuntimeGetMissTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps10", "brokerRuntimeGetMissTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps10F, entry);
}
mfs.add(brokerRuntimeGetMissTps10F);
GaugeMetricFamily brokerRuntimePutTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps600", "brokerRuntimePutTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutTps600F, entry);
}
mfs.add(brokerRuntimePutTps600F);
GaugeMetricFamily brokerRuntimePutTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps60", "brokerRuntimePutTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutTps60F, entry);
}
mfs.add(brokerRuntimePutTps60F);
GaugeMetricFamily brokerRuntimePutTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps10", "brokerRuntimePutTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePutTps10F, entry);
}
mfs.add(brokerRuntimePutTps10F);
GaugeMetricFamily brokerRuntimeDispatchMaxBufferF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_maxbuffer", "brokerRuntimeDispatchMaxBuffer", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchMaxBufferF, entry);
}
mfs.add(brokerRuntimeDispatchMaxBufferF);
GaugeMetricFamily brokerRuntimePullThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_capacity", "brokerRuntimePullThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueCapacityF, entry);
}
mfs.add(brokerRuntimePullThreadPoolQueueCapacityF);
GaugeMetricFamily brokerRuntimeSendThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_capacity", "brokerRuntimeSendThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueCapacityF, entry);
}
mfs.add(brokerRuntimeSendThreadPoolQueueCapacityF);
GaugeMetricFamily brokerRuntimePullThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_size", "brokerRuntimePullThreadPoolQueueSizeF", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueSizeF, entry);
}
mfs.add(brokerRuntimePullThreadPoolQueueSizeF);
GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_size", "brokerRuntimeQueryThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueSizeF, entry);
}
mfs.add(brokerRuntimeQueryThreadPoolQueueSizeF);
GaugeMetricFamily brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills", "brokerRuntimePullThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF, entry);
}
mfs.add(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF);
GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills", "brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF, entry);
}
mfs.add(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF);
GaugeMetricFamily brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills", "brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF, entry);
}
mfs.add(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF);
GaugeMetricFamily brokerRuntimeMsgGetTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_yesterdaymorning", "brokerRuntimeMsgGetTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalYesterdayMorningF, entry);
}
mfs.add(brokerRuntimeMsgGetTotalYesterdayMorningF);
GaugeMetricFamily brokerRuntimeMsgPutTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_yesterdaymorning", "brokerRuntimeMsgPutTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalYesterdayMorningF, entry);
}
mfs.add(brokerRuntimeMsgPutTotalYesterdayMorningF);
GaugeMetricFamily brokerRuntimeMsgGetTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_todaymorning", "brokerRuntimeMsgGetTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayMorningF, entry);
}
mfs.add(brokerRuntimeMsgGetTotalTodayMorningF);
GaugeMetricFamily brokerRuntimeMsgPutTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_todaymorning", "brokerRuntimeMsgPutTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayMorningF, entry);
}
mfs.add(brokerRuntimeMsgPutTotalTodayMorningF);
GaugeMetricFamily brokerRuntimeCommitLogDirCapacityFreeF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_free", "brokerRuntimeCommitLogDirCapacityFree", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityFreeF, entry);
}
mfs.add(brokerRuntimeCommitLogDirCapacityFreeF);
GaugeMetricFamily brokerRuntimeCommitLogDirCapacityTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_total", "brokerRuntimeCommitLogDirCapacityTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityTotalF, entry);
}
mfs.add(brokerRuntimeCommitLogDirCapacityTotalF);
GaugeMetricFamily brokerRuntimeCommitLogMaxOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_maxoffset", "brokerRuntimeCommitLogMaxOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMaxOffsetF, entry);
}
mfs.add(brokerRuntimeCommitLogMaxOffsetF);
GaugeMetricFamily brokerRuntimeCommitLogMinOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_minoffset", "brokerRuntimeCommitLogMinOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMinOffsetF, entry);
}
mfs.add(brokerRuntimeCommitLogMinOffsetF);
GaugeMetricFamily brokerRuntimeRemainHowManyDataToFlushF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_howmanydata_toflush", "brokerRuntimeRemainHowManyDataToFlush", BROKER_RUNTIME_METRIC_LABEL_NAMES);
- for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.entrySet()) {
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.asMap().entrySet()) {
loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
}
mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
index d42f1a8..3689fc0 100644
--- a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
@@ -54,6 +54,8 @@ public class RMQConfigure {
private String secretKey;
+ private long outOfTimeSeconds;
+
public boolean enableACL() {
return this.enableACL;
}
@@ -133,4 +135,12 @@ public class RMQConfigure {
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
+
+ public long getOutOfTimeSeconds() {
+ return outOfTimeSeconds;
+ }
+
+ public void setOutOfTimeSeconds(long outOfTimeSeconds) {
+ this.outOfTimeSeconds = outOfTimeSeconds;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
index b38b921..e0088d2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.exporter.service.impl;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
+import org.apache.rocketmq.exporter.config.RMQConfigure;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@@ -31,6 +33,9 @@ import java.util.Iterator;
@Service
public class RMQMetricsServiceImpl implements RMQMetricsService {
+ @Autowired
+ private RMQConfigure configure;
+
private CollectorRegistry registry = new CollectorRegistry();
private final RMQMetricsCollector rmqMetricsCollector;
@@ -38,8 +43,9 @@ public class RMQMetricsServiceImpl implements RMQMetricsService {
return rmqMetricsCollector;
}
- public RMQMetricsServiceImpl() {
- rmqMetricsCollector = new RMQMetricsCollector();
+ public RMQMetricsServiceImpl(RMQConfigure configure) {
+ this.configure = configure;
+ rmqMetricsCollector = new RMQMetricsCollector(configure.getOutOfTimeSeconds());
rmqMetricsCollector.register(registry);
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index 35586df..19be6f4 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -546,7 +546,6 @@ public class MetricsCollectTask {
return;
}
- Set<BrokerMetric> masterMetricsSet = new HashSet<>();
Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
@@ -577,11 +576,8 @@ public class MetricsCollectTask {
} catch (Exception ex) {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
}
- masterMetricsSet.add(new BrokerMetric(clusterName, brokerIP, brokerName));
}
- metricsService.getCollector().clearBrokerPutNumsMetric(masterMetricsSet);
- metricsService.getCollector().clearBrokerGetNumsMetric(masterMetricsSet);
log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index dd85a06..86954a4 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -20,6 +20,7 @@ rocketmq:
enableACL: false # if >=4.4.0
accessKey: # if >=4.4.0
secretKey: # if >=4.4.0
+ outOfTimeSeconds: # Cache clear time with no update
threadpool:
collect-client-metric-executor: