You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by wl...@apache.org on 2020/03/08 13:16:59 UTC
[rocketmq-exporter] 39/43: add all metrics
This is an automated email from the ASF dual-hosted git repository.
wlliqipeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git
commit f0e1125a8ca54e65edf299aab66c38e18564355b
Author: liwei5 <li...@vipkid.com.cn>
AuthorDate: Thu Dec 12 00:17:32 2019 +0800
add all metrics
---
pom.xml | 10 +-
.../exporter/aspect/admin/MQAdminAspect.java | 71 --
.../admin/annotation/MultiMQAdminCmdMethod.java | 30 -
.../exporter/collector/RMQMetricsCollector.java | 1178 ++++++++++++++++++--
.../rocketmq/exporter/config/RMQConfigure.java | 4 +-
.../rocketmq/exporter/config/ScheduleConfig.java | 26 +
.../exporter/controller/RMQMetricsController.java | 9 +-
.../exporter/exception/ServiceException.java | 31 -
.../exporter/model/BrokerRuntimeStats.java | 609 ++++++++++
.../exporter/model/metrics/BrokerMetric.java | 43 +-
.../model/metrics/ConsumerCountMetric.java | 58 +
.../exporter/model/metrics/ConsumerMetric.java | 45 +-
.../model/metrics/ConsumerQueueMetric.java | 93 --
.../model/metrics/ConsumerTopicDiffMetric.java | 61 +
.../model/metrics/DLQTopicOffsetMetric.java | 72 ++
.../exporter/model/metrics/ProducerMetric.java | 61 +-
.../exporter/model/metrics/TopicPutNumMetric.java | 83 ++
.../metrics/brokerruntime/BrokerRuntimeMetric.java | 91 ++
.../exporter/service/AbstractCommonService.java | 50 -
.../exporter/service/RMQMetricsService.java | 6 +-
.../exporter/service/client/MQAdminExtImpl.java | 279 ++---
.../exporter/service/client/MQAdminInstance.java | 118 +-
.../service/impl/RMQMetricsServiceImpl.java | 16 +-
.../rocketmq/exporter/task/MetricsCollectTask.java | 603 ++++++----
.../apache/rocketmq/exporter/util/JsonUtil.java | 39 +-
src/main/resources/application.properties | 14 -
src/main/resources/application.yml | 32 +
src/main/resources/logback.xml | 55 +-
28 files changed, 2849 insertions(+), 938 deletions(-)
diff --git a/pom.xml b/pom.xml
index cda549d..8a84c73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,15 @@
</resources>
</configuration>
</plugin>
- </plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ </plugins>
</build>
diff --git a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java b/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
deleted file mode 100644
index 6b5435f..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.exporter.aspect.admin;
-
-import java.lang.reflect.Method;
-import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
-import org.apache.rocketmq.exporter.service.client.MQAdminInstance;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.annotation.Pointcut;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-@Aspect
-@Service
-public class MQAdminAspect {
- private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
-
- public MQAdminAspect() {
- }
-
- @Pointcut("execution(* org.apache.rocketmq.exporter.service.client.MQAdminExtImpl..*(..))")
- public void mQAdminMethodPointCut() {
-
- }
-
- @Pointcut("@annotation(org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod)")
- public void multiMQAdminMethodPointCut() {
-
- }
-
- @Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()")
- public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
- long start = System.currentTimeMillis();
- Object obj;
- try {
- MethodSignature signature = (MethodSignature)joinPoint.getSignature();
- Method method = signature.getMethod();
- MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
- if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
- MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
- }
- else {
- MQAdminInstance.initMQAdminInstance(0);
- }
- obj = joinPoint.proceed();
- }
- finally {
- MQAdminInstance.destroyMQAdminInstance();
- logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
- }
- return obj;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java b/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
deleted file mode 100644
index 7953fb3..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.exporter.aspect.admin.annotation;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Target({ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface MultiMQAdminCmdMethod {
- long timeoutMillis() default 0;
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index 2854712..253e9f9 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -17,12 +17,17 @@
package org.apache.rocketmq.exporter.collector;
import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
import io.prometheus.client.GaugeMetricFamily;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerCountMetric;
import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
-import org.apache.rocketmq.exporter.model.metrics.ConsumerQueueMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerTopicDiffMetric;
+import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
+import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
+import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,160 +36,1159 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RMQMetricsCollector extends Collector {
+ //max offset of normal consume queue
+ private ConcurrentHashMap<ProducerMetric, Double> topicOffset = new ConcurrentHashMap<>();
+ //max offset of retry topic consume queue
+ private ConcurrentHashMap<ProducerMetric, Double> topicRetryOffset = new ConcurrentHashMap<>();
+ //max offset of dlq consume queue
+ private ConcurrentHashMap<DLQTopicOffsetMetric, Double> topicDLQOffset = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ProducerMetric, Double> topicPutNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ProducerMetric, Double> topicPutSize = new ConcurrentHashMap<>();
+ //total put numbers for topics
+ private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutNums = new ConcurrentHashMap<>();
+ //total get numbers for topics
+ private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutSize = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ProducerMetric, Double> topicOffset = new ConcurrentHashMap<>();
+ //diff for consumer group
+ private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDiff = new ConcurrentHashMap<>();
+ //retry diff for consumer group
+ private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerRetryDiff = new ConcurrentHashMap<>();
+ //死信堆积 todo 检查是否存在这个数据 应该不存在
+ private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDLQDiff = new ConcurrentHashMap<>();
+ //consumer count
+ private ConcurrentHashMap<ConsumerCountMetric, Integer> consumerCounts = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerMetric, Double> brokerPutNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<BrokerMetric, Double> brokerGetNums = new ConcurrentHashMap<>();
+ //broker offset for consumer-topic
+ private ConcurrentHashMap<ConsumerMetric, Long> groupBrokerTotalOffset = new ConcurrentHashMap<>();
+ //consumer offset for consumer-topic
+ private ConcurrentHashMap<ConsumerMetric, Long> groupConsumeTotalOffset = new ConcurrentHashMap<>();
+ //consume tps
+ private ConcurrentHashMap<ConsumerMetric, Double> groupConsumeTPS = new ConcurrentHashMap<>();
+ //consumed message count for consumer-topic
+ private ConcurrentHashMap<ConsumerMetric, Double> groupGetNums = new ConcurrentHashMap<>();
+ //consumed message size(byte) for consumer-topic
+ private ConcurrentHashMap<ConsumerMetric, Double> groupGetSize = new ConcurrentHashMap<>();
+ //re-consumed message count for consumer-topic
+ private ConcurrentHashMap<ConsumerMetric, Double> sendBackNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerMetric, Double> groupGetNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerMetric, Double> groupGetSize = new ConcurrentHashMap<>();
+ //total put message count for one broker
+ private ConcurrentHashMap<BrokerMetric, Double> brokerPutNums = new ConcurrentHashMap<>();
+ //total get message count for one broker
+ private ConcurrentHashMap<BrokerMetric, Double> brokerGetNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerMetric, Double> sendBackNums = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerMetric, Double> groupOffset = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerQueueMetric, Double> groupGetLatency = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning = new ConcurrentHashMap<>();
- private ConcurrentHashMap<ConsumerMetric, Double> groupGetLatencyByStoreTime = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps600 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps60 = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps10 = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal = new ConcurrentHashMap<>();
+
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush = new ConcurrentHashMap<>();
+
+ private static List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers");
+
+ private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
+ family.addMetric(
+ Arrays.asList(
+ entry.getKey().getGroup(),
+ entry.getKey().getTopic(),
+ entry.getKey().getCountOfOnlineConsumers()
+ ),
+ entry.getValue().doubleValue());
+ }
+
+ private static List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("group", "caddr", "localaddr");
+
+ private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.entrySet()) {
+ loadGroupDiffMetric(groupGetLatencyByConsumerDiff, entry);
+ }
+ mfs.add(groupGetLatencyByConsumerDiff);
+
+ GaugeMetricFamily groupGetLatencyByConsumerRetryDiff = new GaugeMetricFamily("rocketmq_group_retrydiff", "GroupRetryDiff", GROUP_DIFF_LABEL_NAMES);
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.entrySet()) {
+ loadGroupDiffMetric(groupGetLatencyByConsumerRetryDiff, entry);
+ }
+ mfs.add(groupGetLatencyByConsumerRetryDiff);
+
+ GaugeMetricFamily groupGetLatencyByConsumerDLQDiff = new GaugeMetricFamily("rocketmq_group_dlqdiff", "GroupDLQDiff", GROUP_DIFF_LABEL_NAMES);
+ for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.entrySet()) {
+ loadGroupDiffMetric(groupGetLatencyByConsumerDLQDiff, entry);
+ }
+ mfs.add(groupGetLatencyByConsumerDLQDiff);
+
+ GaugeMetricFamily consumerCountsF = new GaugeMetricFamily("rocketmq_group_count", "GroupCount", GROUP_COUNT_LABEL_NAMES);
+ for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.entrySet()) {
+ consumerCountsF.addMetric(
+ Arrays.asList(
+ entry.getKey().getGroup(),
+ entry.getKey().getCaddr(),
+ entry.getKey().getLocaladdr()
+ ),
+ entry.getValue().doubleValue());
+ }
+ mfs.add(consumerCountsF);
+
+ }
+
+
+ private static List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
+ "cluster", "brokerNames", "topic", "lastUpdateTimestamp"
+ );
+
+ private static List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
+ "cluster", "brokerNames", "group", "lastUpdateTimestamp"
+ );
+
+ private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
+ family.addMetric(
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerNames(),
+ entry.getKey().getTopicName(),
+ String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ ),
+ entry.getValue());
+ }
+
+ private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_topic_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
+ for (Map.Entry<ProducerMetric, Double> entry : topicOffset.entrySet()) {
+ loadTopicOffsetMetric(topicOffsetF, entry);
+ }
+ mfs.add(topicOffsetF);
+
+ GaugeMetricFamily topicRetryOffsetF = new GaugeMetricFamily("rocketmq_topic_retry_offset", "TopicRetryOffset", TOPIC_OFFSET_LABEL_NAMES);
+ for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.entrySet()) {
+ loadTopicOffsetMetric(topicRetryOffsetF, entry);
+ }
+ mfs.add(topicRetryOffsetF);
+
+ GaugeMetricFamily topicDLQOffsetF = new GaugeMetricFamily("rocketmq_topic_dlq_offset", "TopicRetryOffset", DLQ_TOPIC_OFFSET_LABEL_NAMES);
+ for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.entrySet()) {
+ topicDLQOffsetF.addMetric(
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerNames(),
+ entry.getKey().getGroup(),
+ String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ ),
+ entry.getValue());
+ }
+ mfs.add(topicDLQOffsetF);
+
+ }
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
- GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", Arrays.asList("cluster","broker","topic"));
- for (Map.Entry<ProducerMetric,Double> entry:topicPutNums.entrySet()) {
- topicPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+ collectConsumerMetric(mfs);
+
+ collectTopicOffsetMetric(mfs);
+
+ collectTopicNums(mfs);
+
+ collectGroupNums(mfs);
+
+ collectBrokerNums(mfs);
+
+ collectBrokerRuntimeStats(mfs);
+
+ return mfs;
+ }
+
+ private static List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
+ "cluster", "broker", "topic", "group", "queueid"
+ );
+ private static List<String> GROUP_LATENCY_BY_STORETIME_LABEL_NAMES = Arrays.asList(
+ "topic", "group"
+ );
+
+ private static List<String> BROKER_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "brokerHost");
+
+ private static void loadBrokerNums(GaugeMetricFamily family, Map.Entry<BrokerMetric, Double> entry) {
+ family.addMetric(Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerIP(),
+ entry.getKey().getBrokerHost()),
+ entry.getValue()
+ );
+ }
+
+ private void collectBrokerNums(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_put_nums", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
+ for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.entrySet()) {
+ loadBrokerNums(brokerPutNumsGauge, entry);
}
- mfs.add(topicPutNumsGauge);
+ mfs.add(brokerPutNumsGauge);
+ GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_get_nums", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
+ for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.entrySet()) {
+ loadBrokerNums(brokerGetNumsGauge, entry);
+ }
+ mfs.add(brokerGetNumsGauge);
+ }
+
+
+ private static List<String> GROUP_NUMS_LABEL_NAMES = Arrays.asList(
+ "topic", "group"
+ );
- GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", Arrays.asList("cluster","broker","topic"));
- for (Map.Entry<ProducerMetric, Double> entry: topicPutSize.entrySet()) {
- topicPutSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+ private static <T extends Number> void loadGroupNumsMetric(GaugeMetricFamily family, Map.Entry<ConsumerMetric, T> entry) {
+ family.addMetric(Arrays.asList(
+ entry.getKey().getTopicName(),
+ entry.getKey().getConsumerGroupName()),
+ entry.getValue().doubleValue()
+ );
+ }
+
+ private void collectBrokerRuntimeStatsPutMessageDistributeTime(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily pmdt0 = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0ms", "PutMessageDistributeTimeMap0ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt0, entry);
}
- mfs.add(topicPutSizeGauge);
+ mfs.add(pmdt0);
+ GaugeMetricFamily pmdt0to10ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0to10ms", "PutMessageDistributeTimeMap0to10ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt0to10ms, entry);
+ }
+ mfs.add(pmdt0to10ms);
- CounterMetricFamily topicOffsetGauge = new CounterMetricFamily("rocketmq_producer_offset", "TopicOffset", Arrays.asList("cluster","broker","topic"));
- for (Map.Entry<ProducerMetric, Double> entry: topicOffset.entrySet()) {
- topicOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+ GaugeMetricFamily pmdt10to50ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10to50ms", "PutMessageDistributeTimeMap10to50ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt10to50ms, entry);
}
- mfs.add(topicOffsetGauge);
+ mfs.add(pmdt10to50ms);
+ GaugeMetricFamily pmdt50to100ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_50to100ms", "PutMessageDistributeTimeMap50to100ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt50to100ms, entry);
+ }
+ mfs.add(pmdt50to100ms);
- GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", Arrays.asList("cluster","broker"));
- for (Map.Entry<BrokerMetric, Double> entry: brokerPutNums.entrySet()) {
- brokerPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+ GaugeMetricFamily pmdt100to200ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_100to200ms", "PutMessageDistributeTimeMap100to200ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt100to200ms, entry);
}
- mfs.add(brokerPutNumsGauge);
+ mfs.add(pmdt100to200ms);
+ GaugeMetricFamily pmdt200to500ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_200to500ms", "PutMessageDistributeTimeMap200to500ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt200to500ms, entry);
+ }
+ mfs.add(pmdt200to500ms);
- GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", Arrays.asList("cluster","broker"));
- for (Map.Entry<BrokerMetric, Double> entry: brokerGetNums.entrySet()) {
- brokerGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+ GaugeMetricFamily pmdt500to1s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_500to1s", "PutMessageDistributeTimeMap500to1s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt500to1s, entry);
}
- mfs.add(brokerGetNumsGauge);
+ mfs.add(pmdt500to1s);
+ GaugeMetricFamily pmdt1to2s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_1to2s", "PutMessageDistributeTimeMap1to2s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt1to2s, entry);
+ }
+ mfs.add(pmdt1to2s);
- GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", Arrays.asList("cluster","broker","topic","group"));
- for (Map.Entry<ConsumerMetric, Double> entry: groupGetNums.entrySet()) {
- groupGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+ GaugeMetricFamily pmdt2to3s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_2to3s", "PutMessageDistributeTimeMap2to3s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt2to3s, entry);
}
+ mfs.add(pmdt2to3s);
+ GaugeMetricFamily pmdt3to4s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_3to4s", "PutMessageDistributeTimeMap3to4s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt3to4s, entry);
+ }
+ mfs.add(pmdt3to4s);
+
+ GaugeMetricFamily pmdt4to5s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_4to5s", "PutMessageDistributeTimeMap4to5s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt4to5s, entry);
+ }
+ mfs.add(pmdt4to5s);
+
+ GaugeMetricFamily pmdt5to10s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_5to10s", "PutMessageDistributeTimeMap5to10s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt5to10s, entry);
+ }
+ mfs.add(pmdt5to10s);
+
+ GaugeMetricFamily pmdt10stoMore = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10stomore", "PutMessageDistributeTimeMap10toMore", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.entrySet()) {
+ loadBrokerRuntimeStatsMetric(pmdt10stoMore, entry);
+ }
+ mfs.add(pmdt10stoMore);
+ }
+
+ private void collectGroupNums(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_group_get_nums", "GroupGetNums", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.entrySet()) {
+ loadGroupNumsMetric(groupGetNumsGauge, entry);
+ }
mfs.add(groupGetNumsGauge);
+ GaugeMetricFamily groupConsumeTPSF = new GaugeMetricFamily("rocketmq_group_consume_tps", "GroupConsumeTPS", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.entrySet()) {
+ loadGroupNumsMetric(groupConsumeTPSF, entry);
+ }
+ mfs.add(groupConsumeTPSF);
- GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_message_size", "GroupGetMessageSize", Arrays.asList("cluster","broker","topic","group"));
- for (Map.Entry<ConsumerMetric, Double> entry: groupGetSize.entrySet()) {
- groupGetSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+ GaugeMetricFamily groupBrokerTotalOffsetF = new GaugeMetricFamily("rocketmq_group_broker_total_offset", "GroupBrokerTotalOffset", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+ loadGroupNumsMetric(groupBrokerTotalOffsetF, entry);
}
- mfs.add(groupGetSizeGauge);
+ mfs.add(groupBrokerTotalOffsetF);
- CounterMetricFamily groupOffsetGauge = new CounterMetricFamily("rocketmq_consumer_offset", "GroupOffset", Arrays.asList("cluster","broker","topic","group"));
- for (Map.Entry<ConsumerMetric, Double> entry: groupOffset.entrySet()) {
- groupOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+ GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+ loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
}
- mfs.add(groupOffsetGauge);
+ mfs.add(groupConsumeTotalOffsetF);
+ GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_group_get_messagesize", "GroupGetMessageSize", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.entrySet()) {
+ loadGroupNumsMetric(groupGetSizeGauge, entry);
+ }
+ mfs.add(groupGetSizeGauge);
- GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", Arrays.asList("cluster","broker","topic","group"));
- for (Map.Entry<ConsumerMetric, Double> entry: sendBackNums.entrySet()) {
- sendBackNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+ GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", GROUP_NUMS_LABEL_NAMES);
+ for (Map.Entry<ConsumerMetric, Double> entry : sendBackNums.entrySet()) {
+ loadGroupNumsMetric(sendBackNumsGauge, entry);
}
mfs.add(sendBackNumsGauge);
+ }
+ private void collectTopicNums(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_topic_put_nums", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
+ for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.entrySet()) {
+ loadTopicNumsMetric(topicPutNumsGauge, entry);
+ }
+ mfs.add(topicPutNumsGauge);
- GaugeMetricFamily groupGetLatencyGauge = new GaugeMetricFamily("rocketmq_group_get_latency", "GroupGetLatency", Arrays.asList("cluster","broker","topic","group","queueid"));
- for (Map.Entry<ConsumerQueueMetric, Double> entry: groupGetLatency.entrySet()) {
- groupGetLatencyGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName(),entry.getKey().getQueueId()), entry.getValue());
+ GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_topic_put_messagesize", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
+ for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.entrySet()) {
+ loadTopicNumsMetric(topicPutSizeGauge, entry);
}
- mfs.add(groupGetLatencyGauge);
+ mfs.add(topicPutSizeGauge);
+ }
- GaugeMetricFamily groupGetLatencyByStoretimeGauge = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime", "GroupGetLatencyByStoreTime", Arrays.asList("cluster","broker","topic","group"));
- for (Map.Entry<ConsumerMetric, Double> entry: groupGetLatencyByStoreTime.entrySet()) {
- groupGetLatencyByStoretimeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+ private static List<String> TOPIC_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokers", "topic");
+
+ private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
+ family.addMetric(
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerNames(),
+ entry.getKey().getTopicName()
+ ),
+ entry.getValue()
+ );
+ }
+
+ public void addTopicOffsetMetric(String clusterName, String brokerNames, String topic, long lastUpdateTimestamp, double value) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ topicRetryOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
+ } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ topicDLQOffset.put(new DLQTopicOffsetMetric(clusterName, brokerNames, topic.replace(MixAll.DLQ_GROUP_TOPIC_PREFIX, ""), lastUpdateTimestamp), value);
+ } else {
+ topicOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
}
- mfs.add(groupGetLatencyByStoretimeGauge);
+ }
- return mfs;
+ public void addGroupCountMetric(String group, String caddr, String localaddr, int count) {
+ this.consumerCounts.put(new ConsumerCountMetric(group, caddr, localaddr), count);
}
- public void AddTopicPutNumsMetric(String clusterName, String brokerName, String topic, double value)
- {
- topicPutNums.put(new ProducerMetric(clusterName,brokerName,topic),value);
+
+ public void addGroupDiffMetric(String countOfOnlineConsumers, String group, String topic, long value) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ this.consumerRetryDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+ } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ this.consumerDLQDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+ } else {
+ this.consumerDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+ }
+ }
+
+ public void addTopicPutNumsMetric(String cluster, String brokerNames, String brokerIP, String brokerHost,
+ String topic, double value) {
+ topicPutNums.put(new TopicPutNumMetric(cluster, brokerNames, brokerIP, brokerHost, topic), value);
+ }
+
+ public void addTopicPutSizeMetric(String cluster, String brokerName, String brokerIP, String brokerHost,
+ String topic, double value) {
+ topicPutSize.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, brokerHost, topic), value);
+ }
+
+ public void addGroupBrokerTotalOffsetMetric(String topic, String group, long value) {
+ groupBrokerTotalOffset.put(new ConsumerMetric(topic, group), value);
+ }
+
+ public void addGroupConsumerTotalOffsetMetric(String topic, String group, long value) {
+ groupConsumeTotalOffset.put(new ConsumerMetric(topic, group), value);
}
- public void AddTopicPutSizeMetric(String clusterName, String brokerName, String topic, double value)
- {
- topicPutSize.put(new ProducerMetric(clusterName,brokerName,topic),value);
+ public void addGroupConsumeTPSMetric(String topic, String group, double value) {
+ groupConsumeTPS.put(new ConsumerMetric(topic, group), value);
}
- public void AddTopicOffsetMetric(String clusterName, String brokerName, String topic, double value)
- {
- topicOffset.put(new ProducerMetric(clusterName,brokerName,topic),value);
+ public void addGroupGetNumsMetric(String topic, String group, double value) {
+ groupGetNums.put(new ConsumerMetric(topic, group), value);
}
- public void AddBrokerPutNumsMetric(String clusterName, String brokerName, double value)
- {
- brokerPutNums.put(new BrokerMetric(clusterName,brokerName),value);
+ public void addGroupGetSizeMetric(String topic, String group, double value) {
+ groupGetSize.put(new ConsumerMetric(topic, group), value);
}
- public void AddBrokerGetNumsMetric(String clusterName, String brokerName, double value)
- {
- brokerGetNums.put(new BrokerMetric(clusterName,brokerName),value);
+ public void addSendBackNumsMetric(String topic, String group, double value) {
+ sendBackNums.put(new ConsumerMetric(topic, group), value);
}
- public void AddGroupGetNumsMetric(String clusterName, String brokerName, String topic, String group, double value)
- {
- groupGetNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+ public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
+ brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
}
- public void AddGroupGetSizeMetric(String clusterName, String brokerName, String topic, String group, double value)
- {
- groupGetSize.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+ public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
+ brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
}
- public void AddGroupOffsetMetric(String clusterName, String brokerName, String topic, String group, double value)
- {
- groupOffset.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+ public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
+ addBrokerRuntimePutMessageDistributeTimeMap(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion(), stats);
+ addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
+ addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
+
+ brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
+
+ brokerRuntimeMsgGetTotalTodayNow.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
+
+ brokerRuntimeMsgPutTotalTodayMorning.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
+ brokerRuntimeMsgGetTotalTodayMorning.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
+ brokerRuntimeMsgPutTotalYesterdayMorning.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
+ brokerRuntimeMsgGetTotalYesterdayMorning.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
+ brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
+ brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
+ brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
+ brokerRuntimeQueryThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
+ brokerRuntimePullThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
+ brokerRuntimeSendThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
+ brokerRuntimePullThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
+
+ brokerRuntimeRemainHowManyDataToFlush.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
+ brokerRuntimeCommitLogMinOffset.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogMinOffset());
+ brokerRuntimeCommitLogMaxOffset.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
+
+
+ brokerRuntimeDispatchMaxBuffer.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
+ brokerRuntimeConsumeQueueDiskRatio.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
+ brokerRuntimeCommitLogDiskRatio.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
+ brokerRuntimePageCacheLockTimeMills.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
+ brokerRuntimeGetMessageEntireTimeMax.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
+ brokerRuntimePutMessageTimesTotal.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
+ brokerRuntimeSendThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
+ brokerRuntimeStartAcceptSendRequestTimeStamp.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
+ brokerRuntimePutMessageEntireTimeMax.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
+ brokerRuntimeEarliestMessageTimeStamp.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
+ brokerRuntimeRemainTransientStoreBufferNumbs.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
+ brokerRuntimeQueryThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+ brokerRuntimePutMessageAverageSize.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutMessageAverageSize());
+ brokerRuntimePutMessageSizeTotal.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+ brokerRuntimeDispatchBehindBytes.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getDispatchBehindBytes());
}
+ private void addAllKindOfTps(String brokerAddress, String clusterName, String brokerHost, BrokerRuntimeStats stats) {
+ brokerRuntimePutTps10.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getTen());
+ brokerRuntimePutTps60.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getSixty());
+ brokerRuntimePutTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
+ brokerRuntimeGetMissTps10.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getTen());
+ brokerRuntimeGetMissTps60.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
+ brokerRuntimeGetMissTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
+ brokerRuntimeGetTransferedTps10.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
+ brokerRuntimeGetTransferedTps60.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
+ brokerRuntimeGetTransferedTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
+ brokerRuntimeGetTotalTps10.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
+ brokerRuntimeGetTotalTps60.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
+ brokerRuntimeGetTotalTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
+ brokerRuntimeGetFoundTps10.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
+ brokerRuntimeGetFoundTps60.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
+ brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+ brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+ }
- public void AddsendBackNumsMetric(String clusterName, String brokerName, String topic, String group, double value)
- {
- sendBackNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+ private void addCommitLogDirCapacity(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
+ brokerRuntimeCommitLogDirCapacityTotal.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
+ brokerRuntimeCommitLogDirCapacityFree.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
}
- public void AddGroupGetLatencyMetric(String clusterName, String brokerName, String topic, String group, String queueId,double value) {
+ private void addBrokerRuntimePutMessageDistributeTimeMap(
+ String clusterName, String brokerAddress, String brokerHost,
+ String brokerDes, long bootTimestamp, int brokerVersion,
+ BrokerRuntimeStats stats) {
+ brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
+ brokerRuntimePutMessageDistributeTimeMap0to10ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
+ brokerRuntimePutMessageDistributeTimeMap10to50ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
+ brokerRuntimePutMessageDistributeTimeMap50to100ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
+ brokerRuntimePutMessageDistributeTimeMap100to200ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
+ brokerRuntimePutMessageDistributeTimeMap200to500ms.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
+ brokerRuntimePutMessageDistributeTimeMap500to1s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
+ brokerRuntimePutMessageDistributeTimeMap1to2s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
+ brokerRuntimePutMessageDistributeTimeMap2to3s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
+ brokerRuntimePutMessageDistributeTimeMap3to4s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
+ brokerRuntimePutMessageDistributeTimeMap4to5s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
+ brokerRuntimePutMessageDistributeTimeMap5to10s.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
+ brokerRuntimePutMessageDistributeTimeMap10toMore.put(new BrokerRuntimeMetric(
+ clusterName,
+ brokerAddress, brokerHost,
+ brokerDes,
+ bootTimestamp,
+ brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
+ }
- groupGetLatency.put(new ConsumerQueueMetric(clusterName,brokerName,topic,group,queueId),value);
+ private static <T extends Number> void loadBrokerRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<BrokerRuntimeMetric, T> entry) {
+ family.addMetric(Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerAddress(),
+ entry.getKey().getBrokerHost(),
+ entry.getKey().getBrokerDes(),
+ String.valueOf(entry.getKey().getBootTimestamp()),
+ String.valueOf(entry.getKey().getBrokerVersion())
+ ), entry.getValue().doubleValue());
}
- public void AddGroupGetLatencyByStoreTimeMetric(String clusterName, String brokerName, String topic, String group,double value) {
+ private static List<String> BROKER_RUNTIME_METRIC_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "brokerHost", "des", "boottime", "broker_version");
+
+ private void collectBrokerRuntimeStats(List<MetricFamilySamples> mfs) {
+ collectBrokerRuntimeStatsPutMessageDistributeTime(mfs);
+
+ GaugeMetricFamily brokerRuntimeMsgPutTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_put_total_today_now", "brokerRuntimeMsgPutTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayNowF, entry);
+ }
+ mfs.add(brokerRuntimeMsgPutTotalTodayNowF);
+
+ GaugeMetricFamily brokerRuntimeMsgGetTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_today_now", "brokerRuntimeMsgGetTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayNowF, entry);
+ }
+ mfs.add(brokerRuntimeMsgGetTotalTodayNowF);
+
+ GaugeMetricFamily brokerRuntimeDispatchBehindBytesF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_behind_bytes", "brokerRuntimeDispatchBehindBytes", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchBehindBytesF, entry);
+ }
+ mfs.add(brokerRuntimeDispatchBehindBytesF);
+
+ GaugeMetricFamily brokerRuntimePutMessageSizeTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_size_total", "brokerRuntimePutMessageSizeTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageSizeTotalF, entry);
+ }
+ mfs.add(brokerRuntimePutMessageSizeTotalF);
+
+ GaugeMetricFamily brokerRuntimePutMessageAverageSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_average_size", "brokerRuntimePutMessageAverageSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageAverageSizeF, entry);
+ }
+ mfs.add(brokerRuntimePutMessageAverageSizeF);
+
+ GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpool_queue_capacity", "brokerRuntimeQueryThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueCapacityF, entry);
+ }
+ mfs.add(brokerRuntimeQueryThreadPoolQueueCapacityF);
+
+ GaugeMetricFamily brokerRuntimeRemainTransientStoreBufferNumbsF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_transientstore_buffer_numbs", "brokerRuntimeRemainTransientStoreBufferNumbs", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeRemainTransientStoreBufferNumbsF, entry);
+ }
+ mfs.add(brokerRuntimeRemainTransientStoreBufferNumbsF);
+
+ GaugeMetricFamily brokerRuntimeEarliestMessageTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_earliest_message_timestamp", "brokerRuntimeEarliestMessageTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeEarliestMessageTimeStampF, entry);
+ }
+ mfs.add(brokerRuntimeEarliestMessageTimeStampF);
+
+ GaugeMetricFamily brokerRuntimePutMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_entire_time_max", "brokerRuntimePutMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageEntireTimeMaxF, entry);
+ }
+ mfs.add(brokerRuntimePutMessageEntireTimeMaxF);
+
+ GaugeMetricFamily brokerRuntimeStartAcceptSendRequestTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_start_accept_sendrequest_time", "brokerRuntimeStartAcceptSendRequestTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeStartAcceptSendRequestTimeStampF, entry);
+ }
+ mfs.add(brokerRuntimeStartAcceptSendRequestTimeStampF);
- groupGetLatencyByStoreTime.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+ GaugeMetricFamily brokerRuntimeSendThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpool_queue_size", "brokerRuntimeSendThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueSizeF, entry);
+ }
+ mfs.add(brokerRuntimeSendThreadPoolQueueSizeF);
+
+ GaugeMetricFamily brokerRuntimePutMessageTimesTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_times_total", "brokerRuntimePutMessageTimesTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageTimesTotalF, entry);
+ }
+ mfs.add(brokerRuntimePutMessageTimesTotalF);
+
+ GaugeMetricFamily brokerRuntimeGetMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_getmessage_entire_time_max", "brokerRuntimeGetMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetMessageEntireTimeMaxF, entry);
+ }
+ mfs.add(brokerRuntimeGetMessageEntireTimeMaxF);
+
+ GaugeMetricFamily brokerRuntimePageCacheLockTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pagecache_lock_time_mills", "brokerRuntimePageCacheLockTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePageCacheLockTimeMillsF, entry);
+ }
+ mfs.add(brokerRuntimePageCacheLockTimeMillsF);
+
+ GaugeMetricFamily brokerRuntimeCommitLogDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_disk_ratio", "brokerRuntimeCommitLogDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDiskRatioF, entry);
+ }
+ mfs.add(brokerRuntimeCommitLogDiskRatioF);
+
+ GaugeMetricFamily brokerRuntimeConsumeQueueDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_consumequeue_disk_ratio", "brokerRuntimeConsumeQueueDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeConsumeQueueDiskRatioF, entry);
+ }
+ mfs.add(brokerRuntimeConsumeQueueDiskRatioF);
+
+ GaugeMetricFamily brokerRuntimeGetFoundTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps600", "brokerRuntimeGetFoundTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps600F, entry);
+ }
+ mfs.add(brokerRuntimeGetFoundTps600F);
+
+ GaugeMetricFamily brokerRuntimeGetFoundTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps60", "brokerRuntimeGetFoundTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps60F, entry);
+ }
+ mfs.add(brokerRuntimeGetFoundTps60F);
+
+ GaugeMetricFamily brokerRuntimeGetFoundTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps10", "brokerRuntimeGetFoundTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps10F, entry);
+ }
+ mfs.add(brokerRuntimeGetFoundTps10F);
+
+ GaugeMetricFamily brokerRuntimeGetTotalTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps600", "brokerRuntimeGetTotalTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps600F, entry);
+ }
+ mfs.add(brokerRuntimeGetTotalTps600F);
+
+ GaugeMetricFamily brokerRuntimeGetTotalTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps60", "brokerRuntimeGetTotalTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps60F, entry);
+ }
+ mfs.add(brokerRuntimeGetTotalTps60F);
+
+ GaugeMetricFamily brokerRuntimeGetTotalTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps10", "brokerRuntimeGetTotalTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps10F, entry);
+ }
+ mfs.add(brokerRuntimeGetTotalTps10F);
+
+ GaugeMetricFamily brokerRuntimeGetTransferedTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps600", "brokerRuntimeGetTransferedTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps600F, entry);
+ }
+ mfs.add(brokerRuntimeGetTransferedTps600F);
+
+ GaugeMetricFamily brokerRuntimeGetTransferedTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps60", "brokerRuntimeGetTransferedTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps60F, entry);
+ }
+ mfs.add(brokerRuntimeGetTransferedTps60F);
+
+ GaugeMetricFamily brokerRuntimeGetTransferedTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps10", "brokerRuntimeGetTransferedTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps10F, entry);
+ }
+ mfs.add(brokerRuntimeGetTransferedTps10F);
+
+ GaugeMetricFamily brokerRuntimeGetMissTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps600", "brokerRuntimeGetMissTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps600F, entry);
+ }
+ mfs.add(brokerRuntimeGetMissTps600F);
+
+ GaugeMetricFamily brokerRuntimeGetMissTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps60", "brokerRuntimeGetMissTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps60F, entry);
+ }
+ mfs.add(brokerRuntimeGetMissTps60F);
+
+ GaugeMetricFamily brokerRuntimeGetMissTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps10", "brokerRuntimeGetMissTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps10F, entry);
+ }
+ mfs.add(brokerRuntimeGetMissTps10F);
+
+ GaugeMetricFamily brokerRuntimePutTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps600", "brokerRuntimePutTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutTps600F, entry);
+ }
+ mfs.add(brokerRuntimePutTps600F);
+
+ GaugeMetricFamily brokerRuntimePutTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps60", "brokerRuntimePutTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutTps60F, entry);
+ }
+ mfs.add(brokerRuntimePutTps60F);
+
+ GaugeMetricFamily brokerRuntimePutTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps10", "brokerRuntimePutTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutTps10F, entry);
+ }
+ mfs.add(brokerRuntimePutTps10F);
+
+ GaugeMetricFamily brokerRuntimeDispatchMaxBufferF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_maxbuffer", "brokerRuntimeDispatchMaxBuffer", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchMaxBufferF, entry);
+ }
+ mfs.add(brokerRuntimeDispatchMaxBufferF);
+
+ GaugeMetricFamily brokerRuntimePullThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_capacity", "brokerRuntimePullThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueCapacityF, entry);
+ }
+ mfs.add(brokerRuntimePullThreadPoolQueueCapacityF);
+
+ GaugeMetricFamily brokerRuntimeSendThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_capacity", "brokerRuntimeSendThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueCapacityF, entry);
+ }
+ mfs.add(brokerRuntimeSendThreadPoolQueueCapacityF);
+
+ GaugeMetricFamily brokerRuntimePullThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_size", "brokerRuntimePullThreadPoolQueueSizeF", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueSizeF, entry);
+ }
+ mfs.add(brokerRuntimePullThreadPoolQueueSizeF);
+
+ GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_size", "brokerRuntimeQueryThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueSizeF, entry);
+ }
+ mfs.add(brokerRuntimeQueryThreadPoolQueueSizeF);
+
+ GaugeMetricFamily brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills", "brokerRuntimePullThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF, entry);
+ }
+ mfs.add(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF);
+
+ GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills", "brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF, entry);
+ }
+ mfs.add(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF);
+
+ GaugeMetricFamily brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills", "brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF, entry);
+ }
+ mfs.add(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF);
+
+ GaugeMetricFamily brokerRuntimeMsgGetTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_yesterdaymorning", "brokerRuntimeMsgGetTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalYesterdayMorningF, entry);
+ }
+ mfs.add(brokerRuntimeMsgGetTotalYesterdayMorningF);
+
+ GaugeMetricFamily brokerRuntimeMsgPutTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_yesterdaymorning", "brokerRuntimeMsgPutTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalYesterdayMorningF, entry);
+ }
+ mfs.add(brokerRuntimeMsgPutTotalYesterdayMorningF);
+
+ GaugeMetricFamily brokerRuntimeMsgGetTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_todaymorning", "brokerRuntimeMsgGetTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayMorningF, entry);
+ }
+ mfs.add(brokerRuntimeMsgGetTotalTodayMorningF);
+
+ GaugeMetricFamily brokerRuntimeMsgPutTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_todaymorning", "brokerRuntimeMsgPutTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayMorningF, entry);
+ }
+ mfs.add(brokerRuntimeMsgPutTotalTodayMorningF);
+
+ GaugeMetricFamily brokerRuntimeCommitLogDirCapacityFreeF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_free", "brokerRuntimeCommitLogDirCapacityFree", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityFreeF, entry);
+ }
+ mfs.add(brokerRuntimeCommitLogDirCapacityFreeF);
+
+ GaugeMetricFamily brokerRuntimeCommitLogDirCapacityTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_total", "brokerRuntimeCommitLogDirCapacityTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityTotalF, entry);
+ }
+ mfs.add(brokerRuntimeCommitLogDirCapacityTotalF);
+
+ GaugeMetricFamily brokerRuntimeCommitLogMaxOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_maxoffset", "brokerRuntimeCommitLogMaxOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMaxOffsetF, entry);
+ }
+ mfs.add(brokerRuntimeCommitLogMaxOffsetF);
+
+ GaugeMetricFamily brokerRuntimeCommitLogMinOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_minoffset", "brokerRuntimeCommitLogMinOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMinOffsetF, entry);
+ }
+ mfs.add(brokerRuntimeCommitLogMinOffsetF);
+
+ GaugeMetricFamily brokerRuntimeRemainHowManyDataToFlushF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_howmanydata_toflush", "brokerRuntimeRemainHowManyDataToFlush", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
+ }
+ mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
index 2b3956b..e090b68 100644
--- a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
@@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-
-
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
@Configuration
@@ -69,9 +67,11 @@ public class RMQConfigure {
logger.info("setIsVIPChannel isVIPChannel={}", isVIPChannel);
}
}
+
public boolean isEnableCollect() {
return enableCollect;
}
+
public void setEnableCollect(boolean enableCollect) {
this.enableCollect = enableCollect;
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java b/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java
new file mode 100644
index 0000000..9422272
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.exporter.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+@Configuration
+public class ScheduleConfig implements SchedulingConfigurer {
+ @Override
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+ taskRegistrar.setScheduler(taskExecutor());
+ }
+
+ @Value("${task.count}")
+ private int taskCount;
+
+ @Bean(destroyMethod = "shutdown")
+ public Executor taskExecutor() {
+ return Executors.newScheduledThreadPool(this.taskCount);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java b/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
index 4df69d1..e6fe31c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
+++ b/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.exporter.controller;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@@ -33,19 +31,14 @@ import java.io.StringWriter;
@RestController
@EnableAutoConfiguration
public class RMQMetricsController {
-
- private final static Logger log = LoggerFactory.getLogger(RMQMetricsController.class);
-
@Resource
RMQMetricsService metricsService;
@RequestMapping(value = "${rocketmq.config.webTelemetryPath}")
@ResponseBody
private void metrics(HttpServletResponse response) throws IOException {
-
StringWriter writer = new StringWriter();
- metricsService.Metrics(writer);
-
+ metricsService.metrics(writer);
response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
response.getOutputStream().print(writer.toString());
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java b/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
deleted file mode 100644
index ccf3fdc..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.exporter.exception;
-
-public class ServiceException extends RuntimeException {
- private static final long serialVersionUID = 9213584003139969215L;
- private int code;
-
- public ServiceException(int code, String message) {
- super(message);
- this.code = code;
- }
-
- public int getCode() {
- return code;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
new file mode 100644
index 0000000..b6fe3dc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
@@ -0,0 +1,609 @@
+package org.apache.rocketmq.exporter.model;
+
+import org.apache.rocketmq.common.protocol.body.KVTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BrokerRuntimeStats {
+ //今天生产的消息总量
+ private long msgPutTotalTodayNow;
+ //今天消费的消息总量
+ private long msgGetTotalTodayNow;
+
+ //今天早上生产消息总量
+ private long msgPutTotalTodayMorning;
+ //今天早上消费消息总量
+ private long msgGetTotalTodayMorning;
+
+ //昨天早上生产的消息总量
+ private long msgPutTotalYesterdayMorning;
+ //昨天早上消费的消息总量
+ private long msgGetTotalYesterdayMorning;
+
+ //延迟消息位点
+ private List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables = new ArrayList<>();
+
+ //发送线程最大等待时间
+ private long sendThreadPoolQueueHeadWaitTimeMills;
+ //拉取消息线程最大等待时间
+ private long queryThreadPoolQueueHeadWaitTimeMills;
+ //拉取线程最大等待时间
+ private long pullThreadPoolQueueHeadWaitTimeMills;
+
+ //查询线程任务个数
+ private long queryThreadPoolQueueSize;
+ //拉取线程任务个数
+ private long pullThreadPoolQueueSize;
+ //发送线程等待队列长度
+ private long sendThreadPoolQueueCapacity;
+ //拉取线程等待队列长度
+ private long pullThreadPoolQueueCapacity;
+
+ //刷pagecache时间统计
+ private Map<String, Integer> putMessageDistributeTimeMap = new HashMap<>();
+ //还有多少字节的数据没有刷盘
+ private double remainHowManyDataToFlush;
+
+ //commitlog 最小位点
+ private long commitLogMinOffset;
+ //commitlog 最大位点
+ private long commitLogMaxOffset;
+
+ //broker运行时间描述
+ private String runtime;
+ //broker 启动时间
+ private long bootTimestamp;
+ //broker 磁盘总量
+ private double commitLogDirCapacityTotal;
+ //broker 磁盘剩余
+ private double commitLogDirCapacityFree;
+ //broker 版本号
+ private int brokerVersion;
+ //
+ private long dispatchMaxBuffer;
+
+ private PutTps putTps = new PutTps();
+ private GetMissTps getMissTps = new GetMissTps();
+ private GetTransferedTps getTransferedTps = new GetTransferedTps();
+ private GetTotalTps getTotalTps = new GetTotalTps();
+ private GetFoundTps getFoundTps = new GetFoundTps();
+
+ private double consumeQueueDiskRatio;
+ private double commitLogDiskRatio;
+
+ //page cache锁定时间
+ private long pageCacheLockTimeMills;
+
+ private long getMessageEntireTimeMax;
+
+ private long putMessageTimesTotal;
+
+ private String brokerVersionDesc;
+ private long sendThreadPoolQueueSize;
+ private long startAcceptSendRequestTimeStamp;
+ private long putMessageEntireTimeMax;
+ private long earliestMessageTimeStamp;
+
+ private long remainTransientStoreBufferNumbs;
+ private long queryThreadPoolQueueCapacity;
+ //发送消息平均体积大小
+ private double putMessageAverageSize;
+ //全部发送消息数
+ private long putMessageSizeTotal;
+ private long dispatchBehindBytes;
+
+
+ public BrokerRuntimeStats(KVTable kvTable) {
+ this.msgPutTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayNow"));
+
+ loadScheduleMessageOffsets(kvTable);
+ loadPutMessageDistributeTime(kvTable.getTable().get("putMessageDistributeTime"));
+
+ loadTps(this.putTps, kvTable.getTable().get("putTps"));
+ loadTps(this.getMissTps, kvTable.getTable().get("getMissTps"));
+ loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferedTps"));
+ loadTps(this.getTotalTps, kvTable.getTable().get("getTotalTps"));
+ loadTps(this.getFoundTps, kvTable.getTable().get("getFoundTps"));
+
+ loadCommitLogDirCapacity(kvTable.getTable().get("commitLogDirCapacity"));
+
+ this.sendThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"));
+ this.queryThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueHeadWaitTimeMills"));
+
+ this.remainHowManyDataToFlush = Double.parseDouble(kvTable.getTable().get("remainHowManyDataToFlush").split(" ")[0]);//byte
+ this.msgGetTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayNow"));
+ this.queryThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueSize"));
+ this.bootTimestamp = Long.parseLong(kvTable.getTable().get("bootTimestamp"));
+ this.msgPutTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
+ this.msgGetTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
+ this.pullThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueSize"));
+ this.commitLogMinOffset = Long.parseLong(kvTable.getTable().get("commitLogMinOffset"));
+ this.pullThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"));
+ this.runtime = kvTable.getTable().get("runtime");
+ this.dispatchMaxBuffer = Long.parseLong(kvTable.getTable().get("dispatchMaxBuffer"));
+ this.brokerVersion = Integer.parseInt(kvTable.getTable().get("brokerVersion"));
+ this.consumeQueueDiskRatio = Double.parseDouble(kvTable.getTable().get("consumeQueueDiskRatio"));
+ this.pageCacheLockTimeMills = Long.parseLong(kvTable.getTable().get("pageCacheLockTimeMills"));
+ this.commitLogDiskRatio = Double.parseDouble(kvTable.getTable().get("commitLogDiskRatio"));
+ this.commitLogMaxOffset = Long.parseLong(kvTable.getTable().get("commitLogMaxOffset"));
+ this.getMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("getMessageEntireTimeMax"));
+ this.msgPutTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning"));
+ this.putMessageTimesTotal = Long.parseLong(kvTable.getTable().get("putMessageTimesTotal"));
+ this.msgGetTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning"));
+ this.brokerVersionDesc = kvTable.getTable().get("brokerVersionDesc");
+ this.sendThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueSize"));
+ this.startAcceptSendRequestTimeStamp = Long.parseLong(kvTable.getTable().get("startAcceptSendRequestTimeStamp"));
+ this.putMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("putMessageEntireTimeMax"));
+ this.earliestMessageTimeStamp = Long.parseLong(kvTable.getTable().get("earliestMessageTimeStamp"));
+ this.remainTransientStoreBufferNumbs = Long.parseLong(kvTable.getTable().get("remainTransientStoreBufferNumbs"));
+ this.queryThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueCapacity"));
+ this.putMessageAverageSize = Double.parseDouble(kvTable.getTable().get("putMessageAverageSize"));
+ this.dispatchBehindBytes = Long.parseLong(kvTable.getTable().get("dispatchBehindBytes"));
+ this.putMessageSizeTotal = Long.parseLong(kvTable.getTable().get("putMessageSizeTotal"));
+ this.sendThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueCapacity"));
+ this.pullThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueCapacity"));
+
+ }
+
+ private void loadCommitLogDirCapacity(String commitLogDirCapacity) {
+ String[] arr = commitLogDirCapacity.split(" ");
+ double total = Double.parseDouble(arr[2]);
+ double free = Double.parseDouble(arr[6]);
+ this.commitLogDirCapacityTotal = total;
+ this.commitLogDirCapacityFree = free;
+ }
+
+ private void loadTps(PutTps putTps, String value) {
+ String[] arr = value.split(" ");
+ putTps.ten = Double.parseDouble(arr[0]);
+ putTps.sixty = Double.parseDouble(arr[1]);
+ putTps.sixHundred = Double.parseDouble(arr[2]);
+ }
+
+ private void loadPutMessageDistributeTime(String str) {
+ String[] arr = str.split(" ");
+ String key = "", value = "";
+ for (String ar : arr) {
+ String[] tarr = ar.split(":");
+ key = tarr[0].replace("[", "").replace("]", "");
+ value = tarr[1];
+ this.putMessageDistributeTimeMap.put(key, Integer.parseInt(value));
+ }
+ }
+
+ public void loadScheduleMessageOffsets(KVTable kvTable) {
+ for (String key : kvTable.getTable().keySet()) {
+ if (key.startsWith("scheduleMessageOffset")) {
+ String[] arr = kvTable.getTable().get(key).split(",");
+ ScheduleMessageOffsetTable table = new ScheduleMessageOffsetTable(
+ Long.parseLong(arr[0]),
+ Long.parseLong(arr[1])
+ );
+ this.scheduleMessageOffsetTables.add(table);
+ }
+ }
+ }
+
+ public static class ScheduleMessageOffsetTable {
+ private long delayOffset;
+ private long maxOffset;
+
+ public ScheduleMessageOffsetTable(long first, long second) {
+ this.delayOffset = first;
+ this.maxOffset = second;
+ }
+
+ public long getDelayOffset() {
+ return delayOffset;
+ }
+
+ public void setDelayOffset(long delayOffset) {
+ this.delayOffset = delayOffset;
+ }
+
+ public long getMaxOffset() {
+ return maxOffset;
+ }
+
+ public void setMaxOffset(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+ }
+
+ public class PutTps {
+ private double ten;
+ private double sixty;
+ private double sixHundred;
+
+ public double getTen() {
+ return ten;
+ }
+
+ public void setTen(double ten) {
+ this.ten = ten;
+ }
+
+ public double getSixty() {
+ return sixty;
+ }
+
+ public void setSixty(double sixty) {
+ this.sixty = sixty;
+ }
+
+ public double getSixHundred() {
+ return sixHundred;
+ }
+
+ public void setSixHundred(double sixHundred) {
+ this.sixHundred = sixHundred;
+ }
+ }
+
+ public class GetMissTps extends PutTps {
+ }
+
+ public class GetTransferedTps extends PutTps {
+ }
+
+ public class GetTotalTps extends PutTps {
+ }
+
+ public class GetFoundTps extends PutTps {
+ }
+
+ public long getMsgPutTotalTodayNow() {
+ return msgPutTotalTodayNow;
+ }
+
+ public void setMsgPutTotalTodayNow(long msgPutTotalTodayNow) {
+ this.msgPutTotalTodayNow = msgPutTotalTodayNow;
+ }
+
+ public long getMsgGetTotalTodayNow() {
+ return msgGetTotalTodayNow;
+ }
+
+ public void setMsgGetTotalTodayNow(long msgGetTotalTodayNow) {
+ this.msgGetTotalTodayNow = msgGetTotalTodayNow;
+ }
+
+ public long getMsgPutTotalTodayMorning() {
+ return msgPutTotalTodayMorning;
+ }
+
+ public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+ this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+ }
+
+ public long getMsgGetTotalTodayMorning() {
+ return msgGetTotalTodayMorning;
+ }
+
+ public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+ this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+ }
+
+ public long getMsgPutTotalYesterdayMorning() {
+ return msgPutTotalYesterdayMorning;
+ }
+
+ public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+ this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+ }
+
+ public long getMsgGetTotalYesterdayMorning() {
+ return msgGetTotalYesterdayMorning;
+ }
+
+ public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+ this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+ }
+
+ public List<ScheduleMessageOffsetTable> getScheduleMessageOffsetTables() {
+ return scheduleMessageOffsetTables;
+ }
+
+ public void setScheduleMessageOffsetTables(List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables) {
+ this.scheduleMessageOffsetTables = scheduleMessageOffsetTables;
+ }
+
+ public long getSendThreadPoolQueueHeadWaitTimeMills() {
+ return sendThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public void setSendThreadPoolQueueHeadWaitTimeMills(long sendThreadPoolQueueHeadWaitTimeMills) {
+ this.sendThreadPoolQueueHeadWaitTimeMills = sendThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public long getQueryThreadPoolQueueHeadWaitTimeMills() {
+ return queryThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public void setQueryThreadPoolQueueHeadWaitTimeMills(long queryThreadPoolQueueHeadWaitTimeMills) {
+ this.queryThreadPoolQueueHeadWaitTimeMills = queryThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public long getPullThreadPoolQueueHeadWaitTimeMills() {
+ return pullThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public void setPullThreadPoolQueueHeadWaitTimeMills(long pullThreadPoolQueueHeadWaitTimeMills) {
+ this.pullThreadPoolQueueHeadWaitTimeMills = pullThreadPoolQueueHeadWaitTimeMills;
+ }
+
+ public long getQueryThreadPoolQueueSize() {
+ return queryThreadPoolQueueSize;
+ }
+
+ public void setQueryThreadPoolQueueSize(long queryThreadPoolQueueSize) {
+ this.queryThreadPoolQueueSize = queryThreadPoolQueueSize;
+ }
+
+ public long getPullThreadPoolQueueSize() {
+ return pullThreadPoolQueueSize;
+ }
+
+ public void setPullThreadPoolQueueSize(long pullThreadPoolQueueSize) {
+ this.pullThreadPoolQueueSize = pullThreadPoolQueueSize;
+ }
+
+ public long getSendThreadPoolQueueCapacity() {
+ return sendThreadPoolQueueCapacity;
+ }
+
+ public void setSendThreadPoolQueueCapacity(long sendThreadPoolQueueCapacity) {
+ this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
+ }
+
+ public long getPullThreadPoolQueueCapacity() {
+ return pullThreadPoolQueueCapacity;
+ }
+
+ public void setPullThreadPoolQueueCapacity(long pullThreadPoolQueueCapacity) {
+ this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
+ }
+
+ public Map<String, Integer> getPutMessageDistributeTimeMap() {
+ return putMessageDistributeTimeMap;
+ }
+
+ public void setPutMessageDistributeTimeMap(Map<String, Integer> putMessageDistributeTimeMap) {
+ this.putMessageDistributeTimeMap = putMessageDistributeTimeMap;
+ }
+
+ public double getRemainHowManyDataToFlush() {
+ return remainHowManyDataToFlush;
+ }
+
+ public void setRemainHowManyDataToFlush(double remainHowManyDataToFlush) {
+ this.remainHowManyDataToFlush = remainHowManyDataToFlush;
+ }
+
+ public long getCommitLogMinOffset() {
+ return commitLogMinOffset;
+ }
+
+ public void setCommitLogMinOffset(long commitLogMinOffset) {
+ this.commitLogMinOffset = commitLogMinOffset;
+ }
+
+ public long getCommitLogMaxOffset() {
+ return commitLogMaxOffset;
+ }
+
+ public void setCommitLogMaxOffset(long commitLogMaxOffset) {
+ this.commitLogMaxOffset = commitLogMaxOffset;
+ }
+
+ public String getRuntime() {
+ return runtime;
+ }
+
+ public void setRuntime(String runtime) {
+ this.runtime = runtime;
+ }
+
+ public long getBootTimestamp() {
+ return bootTimestamp;
+ }
+
+ public void setBootTimestamp(long bootTimestamp) {
+ this.bootTimestamp = bootTimestamp;
+ }
+
+ public double getCommitLogDirCapacityTotal() {
+ return commitLogDirCapacityTotal;
+ }
+
+ public void setCommitLogDirCapacityTotal(double commitLogDirCapacityTotal) {
+ this.commitLogDirCapacityTotal = commitLogDirCapacityTotal;
+ }
+
+ public double getCommitLogDirCapacityFree() {
+ return commitLogDirCapacityFree;
+ }
+
+ public void setCommitLogDirCapacityFree(double commitLogDirCapacityFree) {
+ this.commitLogDirCapacityFree = commitLogDirCapacityFree;
+ }
+
+ public int getBrokerVersion() {
+ return brokerVersion;
+ }
+
+ public void setBrokerVersion(int brokerVersion) {
+ this.brokerVersion = brokerVersion;
+ }
+
+ public long getDispatchMaxBuffer() {
+ return dispatchMaxBuffer;
+ }
+
+ public void setDispatchMaxBuffer(long dispatchMaxBuffer) {
+ this.dispatchMaxBuffer = dispatchMaxBuffer;
+ }
+
+ public PutTps getPutTps() {
+ return putTps;
+ }
+
+ public void setPutTps(PutTps putTps) {
+ this.putTps = putTps;
+ }
+
+ public GetMissTps getGetMissTps() {
+ return getMissTps;
+ }
+
+ public void setGetMissTps(GetMissTps getMissTps) {
+ this.getMissTps = getMissTps;
+ }
+
+ public GetTransferedTps getGetTransferedTps() {
+ return getTransferedTps;
+ }
+
+ public void setGetTransferedTps(GetTransferedTps getTransferedTps) {
+ this.getTransferedTps = getTransferedTps;
+ }
+
+ public GetTotalTps getGetTotalTps() {
+ return getTotalTps;
+ }
+
+ public void setGetTotalTps(GetTotalTps getTotalTps) {
+ this.getTotalTps = getTotalTps;
+ }
+
+ public GetFoundTps getGetFoundTps() {
+ return getFoundTps;
+ }
+
+ public void setGetFoundTps(GetFoundTps getFoundTps) {
+ this.getFoundTps = getFoundTps;
+ }
+
+ public double getConsumeQueueDiskRatio() {
+ return consumeQueueDiskRatio;
+ }
+
+ public void setConsumeQueueDiskRatio(double consumeQueueDiskRatio) {
+ this.consumeQueueDiskRatio = consumeQueueDiskRatio;
+ }
+
+ public double getCommitLogDiskRatio() {
+ return commitLogDiskRatio;
+ }
+
+ public void setCommitLogDiskRatio(double commitLogDiskRatio) {
+ this.commitLogDiskRatio = commitLogDiskRatio;
+ }
+
+ public long getPageCacheLockTimeMills() {
+ return pageCacheLockTimeMills;
+ }
+
+ public void setPageCacheLockTimeMills(long pageCacheLockTimeMills) {
+ this.pageCacheLockTimeMills = pageCacheLockTimeMills;
+ }
+
+ public long getGetMessageEntireTimeMax() {
+ return getMessageEntireTimeMax;
+ }
+
+ public void setGetMessageEntireTimeMax(long getMessageEntireTimeMax) {
+ this.getMessageEntireTimeMax = getMessageEntireTimeMax;
+ }
+
+ public long getPutMessageTimesTotal() {
+ return putMessageTimesTotal;
+ }
+
+ public void setPutMessageTimesTotal(long putMessageTimesTotal) {
+ this.putMessageTimesTotal = putMessageTimesTotal;
+ }
+
+ public String getBrokerVersionDesc() {
+ return brokerVersionDesc;
+ }
+
+ public void setBrokerVersionDesc(String brokerVersionDesc) {
+ this.brokerVersionDesc = brokerVersionDesc;
+ }
+
+ public long getSendThreadPoolQueueSize() {
+ return sendThreadPoolQueueSize;
+ }
+
+ public void setSendThreadPoolQueueSize(long sendThreadPoolQueueSize) {
+ this.sendThreadPoolQueueSize = sendThreadPoolQueueSize;
+ }
+
+ public long getStartAcceptSendRequestTimeStamp() {
+ return startAcceptSendRequestTimeStamp;
+ }
+
+ public void setStartAcceptSendRequestTimeStamp(long startAcceptSendRequestTimeStamp) {
+ this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
+ }
+
+ public long getPutMessageEntireTimeMax() {
+ return putMessageEntireTimeMax;
+ }
+
+ public void setPutMessageEntireTimeMax(long putMessageEntireTimeMax) {
+ this.putMessageEntireTimeMax = putMessageEntireTimeMax;
+ }
+
+ public long getEarliestMessageTimeStamp() {
+ return earliestMessageTimeStamp;
+ }
+
+ public void setEarliestMessageTimeStamp(long earliestMessageTimeStamp) {
+ this.earliestMessageTimeStamp = earliestMessageTimeStamp;
+ }
+
+ public long getRemainTransientStoreBufferNumbs() {
+ return remainTransientStoreBufferNumbs;
+ }
+
+ public void setRemainTransientStoreBufferNumbs(long remainTransientStoreBufferNumbs) {
+ this.remainTransientStoreBufferNumbs = remainTransientStoreBufferNumbs;
+ }
+
+ public long getQueryThreadPoolQueueCapacity() {
+ return queryThreadPoolQueueCapacity;
+ }
+
+ public void setQueryThreadPoolQueueCapacity(long queryThreadPoolQueueCapacity) {
+ this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
+ }
+
+ public double getPutMessageAverageSize() {
+ return putMessageAverageSize;
+ }
+
+ public void setPutMessageAverageSize(double putMessageAverageSize) {
+ this.putMessageAverageSize = putMessageAverageSize;
+ }
+
+ public long getPutMessageSizeTotal() {
+ return putMessageSizeTotal;
+ }
+
+ public void setPutMessageSizeTotal(long putMessageSizeTotal) {
+ this.putMessageSizeTotal = putMessageSizeTotal;
+ }
+
+ public long getDispatchBehindBytes() {
+ return dispatchBehindBytes;
+ }
+
+ public void setDispatchBehindBytes(long dispatchBehindBytes) {
+ this.dispatchBehindBytes = dispatchBehindBytes;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
index c7a0727..45caff2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
@@ -17,32 +17,38 @@
package org.apache.rocketmq.exporter.model.metrics;
public class BrokerMetric {
+ private String clusterName;
+ private String brokerIP;
+ private String brokerHost;
- private String clusterName;
- private String brokerName;
-
-
- public void setClusterName(String cluster) {
-
- clusterName = cluster;
+ public BrokerMetric(String clusterName, String brokerIP, String brokerHost) {
+ this.clusterName = clusterName;
+ this.brokerIP = brokerIP;
+ this.brokerHost = brokerHost;
}
- public String getClusterName() {
+ public String getClusterName() {
return clusterName;
}
- void setBrokerName(String broker) {
- brokerName = broker;
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
}
- public String getBrokerName() {
+ public String getBrokerIP() {
+ return brokerIP;
+ }
+
+ public void setBrokerIP(String brokerIP) {
+ this.brokerIP = brokerIP;
+ }
- return brokerName;
+ public String getBrokerHost() {
+ return brokerHost;
}
- public BrokerMetric(String cluster, String broker) {
- clusterName = cluster;
- brokerName = broker;
+ public void setBrokerHost(String brokerHost) {
+ this.brokerHost = brokerHost;
}
@Override
@@ -52,19 +58,20 @@ public class BrokerMetric {
}
BrokerMetric other = (BrokerMetric) obj;
- return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName);
+ return other.clusterName.equals(clusterName) &&
+ other.brokerIP.equals(brokerIP);
}
@Override
public int hashCode() {
int hash = 1;
hash = 37 * hash + clusterName.hashCode();
- hash = 37 * hash + brokerName.hashCode();
+ hash = 37 * hash + brokerIP.hashCode();
return hash;
}
@Override
public String toString() {
- return "ClusterName: " + clusterName + " BrokerName: " + brokerName;
+ return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " brokerHost: " + brokerHost;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
new file mode 100644
index 0000000..67d9f20
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
@@ -0,0 +1,58 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerCountMetric {
+ private String caddr;
+ private String localaddr;
+ private String group;
+
+ public ConsumerCountMetric(String group, String caddr, String localaddr) {
+ this.group = group;
+ this.caddr = caddr;
+ this.localaddr = localaddr;
+ }
+
+ public String getCaddr() {
+ return caddr;
+ }
+
+ public void setCaddr(String caddr) {
+ this.caddr = caddr;
+ }
+
+ public String getLocaladdr() {
+ return localaddr;
+ }
+
+ public void setLocaladdr(String localaddr) {
+ this.localaddr = localaddr;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ConsumerCountMetric)) {
+ return false;
+ }
+ ConsumerCountMetric other = (ConsumerCountMetric) obj;
+ return other.group.equals(group);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + group.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "group: " + group + " caddr: " + caddr + " localaddr: " + localaddr;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
index 9530fff..a0036a2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
@@ -17,32 +17,22 @@
package org.apache.rocketmq.exporter.model.metrics;
public class ConsumerMetric {
+ private String topicName;
+ private String consumerGroupName;
- private String clusterName;
- private String brokerName;
- private String topicName;
- private String consumerGroupName;
-
- public void setClusterName(String cluster) {
- clusterName = cluster;
- }
- public String getClusterName() {
- return clusterName;
- }
- void setBrokerName(String broker) {
- brokerName = broker;
- }
-
- public String getBrokerName() {
- return brokerName;
+ public ConsumerMetric(String topicName, String consumerGroupName) {
+ this.topicName = topicName;
+ this.consumerGroupName = consumerGroupName;
}
- public void setTopicName(String topic) {
- topicName = topic;
- }
public String getTopicName() {
return topicName;
}
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
public String getConsumerGroupName() {
return consumerGroupName;
}
@@ -51,13 +41,6 @@ public class ConsumerMetric {
this.consumerGroupName = consumerGroupName;
}
- public ConsumerMetric(String cluster, String broker, String topic,String consumerGroup) {
- clusterName = cluster;
- brokerName = broker;
- topicName = topic;
- consumerGroupName = consumerGroup;
- }
-
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ConsumerMetric)) {
@@ -65,15 +48,13 @@ public class ConsumerMetric {
}
ConsumerMetric other = (ConsumerMetric) obj;
- return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
- && other.topicName.equals(topicName) && other.consumerGroupName.equals(consumerGroupName);
+ return other.topicName.equals(topicName) &&
+ other.consumerGroupName.equals(consumerGroupName);
}
@Override
public int hashCode() {
int hash = 1;
- hash = 37 * hash + clusterName.hashCode();
- hash = 37 * hash + brokerName.hashCode();
hash = 37 * hash + topicName.hashCode();
hash = 37 * hash + consumerGroupName.hashCode();
return hash;
@@ -81,6 +62,6 @@ public class ConsumerMetric {
@Override
public String toString() {
- return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
+ return "topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
deleted file mode 100644
index a6453fc..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.exporter.model.metrics;
-
-public class ConsumerQueueMetric {
-
- private String clusterName;
- private String brokerName;
- private String topicName;
- private String consumerGroupName;
- private String queueId;
-
- public void setClusterName(String cluster) {
- clusterName = cluster;
- }
- public String getClusterName() {
- return clusterName;
- }
- void setBrokerName(String broker) {
- brokerName = broker;
- }
- public String getBrokerName() {
- return brokerName;
- }
- public void setTopicName(String topic) {
- topicName = topic;
- }
- public String getTopicName() {
- return topicName;
- }
- public String getConsumerGroupName() {
- return consumerGroupName;
- }
-
- public void setConsumerGroupName(String consumerGroupName) {
- this.consumerGroupName = consumerGroupName;
- }
- public String getQueueId() {
- return queueId;
- }
- public void setQueueId(String queueId) {
- this.queueId = queueId;
- }
- public ConsumerQueueMetric(String cluster, String broker, String topic, String consumerGroup,String queue) {
- clusterName = cluster;
- brokerName = broker;
- topicName = topic;
- consumerGroupName = consumerGroup;
- queueId = queue;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ConsumerQueueMetric)) {
- return false;
- }
- ConsumerQueueMetric other = (ConsumerQueueMetric) obj;
-
- return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
- && other.topicName.equals(topicName) && other.consumerGroupName.equals(consumerGroupName)
- && other.queueId.equals(queueId);
- }
-
- @Override
- public int hashCode() {
- int hash = 1;
- hash = 37 * hash + clusterName.hashCode();
- hash = 37 * hash + brokerName.hashCode();
- hash = 37 * hash + topicName.hashCode();
- hash = 37 * hash + consumerGroupName.hashCode();
- hash = 37 * hash + queueId.hashCode();
- return hash;
- }
-
- @Override
- public String toString() {
- return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName + "queueId: " + queueId;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
new file mode 100644
index 0000000..84b15c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
@@ -0,0 +1,61 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerTopicDiffMetric {
+ public ConsumerTopicDiffMetric(String group, String topic, String countOfOnlineConsumers) {
+ this.group = group;
+ this.topic = topic;
+ this.countOfOnlineConsumers = countOfOnlineConsumers;
+ }
+
+ private String group;
+ private String topic;
+ private String countOfOnlineConsumers;
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getCountOfOnlineConsumers() {
+ return countOfOnlineConsumers;
+ }
+
+ public void setCountOfOnlineConsumers(String countOfOnlineConsumers) {
+ this.countOfOnlineConsumers = countOfOnlineConsumers;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ConsumerTopicDiffMetric)) {
+ return false;
+ }
+ ConsumerTopicDiffMetric other = (ConsumerTopicDiffMetric) obj;
+
+ return other.group.equals(group) &&
+ other.topic.equals(topic);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + group.hashCode();
+ hash = 37 * hash + topic.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerGroup: " + group + " Topic: " + topic;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
new file mode 100644
index 0000000..da530de
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
@@ -0,0 +1,72 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class DLQTopicOffsetMetric {
+ private String clusterName;
+ private String brokerNames;
+ private String group;
+ private long lastUpdateTimestamp;
+
+ public DLQTopicOffsetMetric(String clusterName, String brokerNames, String group, long lastUpdateTimestamp) {
+ this.clusterName = clusterName;
+ this.brokerNames = brokerNames;
+ this.group = group;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerNames() {
+ return brokerNames;
+ }
+
+ public void setBrokerNames(String brokerNames) {
+ this.brokerNames = brokerNames;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof DLQTopicOffsetMetric)) {
+ return false;
+ }
+ DLQTopicOffsetMetric other = (DLQTopicOffsetMetric) obj;
+
+ return other.clusterName.equals(clusterName) &&
+ other.group.equals(group);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + group.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " group: " + group;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
index 72baa73..fb9c5a7 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
@@ -16,34 +16,50 @@
*/
package org.apache.rocketmq.exporter.model.metrics;
+//每个topic最大位点
public class ProducerMetric {
+ private String clusterName;
+ private String brokerNames;
+ private String topicName;
+ private long lastUpdateTimestamp;
- private String clusterName;
- private String brokerName;
- private String topicName;
-
- public void setClusterName(String cluster) {
- clusterName = cluster;
- }
- public String getClusterName() {
+ public String getClusterName() {
return clusterName;
}
- void setBrokerName(String broker) {
- brokerName = broker;
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
}
- public String getBrokerName() {
- return brokerName;
+
+ public String getBrokerNames() {
+ return brokerNames;
}
- public void setTopicName(String topic) {
- topicName = topic;
+
+ public void setBrokerNames(String brokerNames) {
+ this.brokerNames = brokerNames;
}
- public String getTopicName() {
+
+ public String getTopicName() {
return topicName;
}
- public ProducerMetric(String cluster,String broker,String topic) {
- clusterName = cluster;
- brokerName = broker;
- topicName = topic;
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public ProducerMetric(String clusterName, String brokerNames, String topicName, long lastUpdateTimestamp) {
+ this.clusterName = clusterName;
+ this.brokerNames = brokerNames;
+ this.topicName = topicName;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@Override
@@ -53,21 +69,20 @@ public class ProducerMetric {
}
ProducerMetric other = (ProducerMetric) obj;
- return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
- && other.topicName.equals(topicName);
+ return other.clusterName.equals(clusterName) &&
+ other.topicName.equals(topicName);
}
@Override
public int hashCode() {
int hash = 1;
hash = 37 * hash + clusterName.hashCode();
- hash = 37 * hash + brokerName.hashCode();
hash = 37 * hash + topicName.hashCode();
return hash;
}
@Override
public String toString() {
- return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName;
+ return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " topicName: " + topicName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
new file mode 100644
index 0000000..37f4e86
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
@@ -0,0 +1,83 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class TopicPutNumMetric {
+ private String clusterName;
+ private String brokerNames;
+ private String brokerIP;
+ private String brokerHost;
+ private String topicName;
+
+ public TopicPutNumMetric(String clusterName, String brokerNames, String brokerIP, String brokerHost, String topicName) {
+ this.clusterName = clusterName;
+ this.brokerNames = brokerNames;
+ this.brokerIP = brokerIP;
+ this.brokerHost = brokerHost;
+ this.topicName = topicName;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerNames() {
+ return brokerNames;
+ }
+
+ public void setBrokerNames(String brokerNames) {
+ this.brokerNames = brokerNames;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getBrokerIP() {
+ return brokerIP;
+ }
+
+ public void setBrokerIP(String brokerIP) {
+ this.brokerIP = brokerIP;
+ }
+
+ public String getBrokerHost() {
+ return brokerHost;
+ }
+
+ public void setBrokerHost(String brokerHost) {
+ this.brokerHost = brokerHost;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TopicPutNumMetric)) {
+ return false;
+ }
+ TopicPutNumMetric other = (TopicPutNumMetric) obj;
+
+ return other.clusterName.equals(clusterName) &&
+ other.brokerIP.equals(brokerIP) &&
+ other.topicName.equals(topicName);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + topicName.hashCode();
+ hash = 37 * hash + brokerIP.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " topicName: " + topicName;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java
new file mode 100644
index 0000000..beb583e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java
@@ -0,0 +1,91 @@
+package org.apache.rocketmq.exporter.model.metrics.brokerruntime;
+
+public class BrokerRuntimeMetric {
+ private String clusterName;
+ private String brokerAddress;
+ private String brokerHost;
+ private String brokerDes;
+ private long bootTimestamp;
+ private int brokerVersion;
+
+ public BrokerRuntimeMetric(String clusterName, String brokerAddress, String brokerHost, String brokerDes, long bootTimestamp, int brokerVersion) {
+ this.clusterName = clusterName;
+ this.brokerAddress = brokerAddress;
+ this.brokerHost = brokerHost;
+ this.brokerDes = brokerDes;
+ this.bootTimestamp = bootTimestamp;
+ this.brokerVersion = brokerVersion;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ public void setBrokerAddress(String brokerAddress) {
+ this.brokerAddress = brokerAddress;
+ }
+
+ public String getBrokerHost() {
+ return brokerHost;
+ }
+
+ public void setBrokerHost(String brokerHost) {
+ this.brokerHost = brokerHost;
+ }
+
+ public String getBrokerDes() {
+ return brokerDes;
+ }
+
+ public void setBrokerDes(String brokerDes) {
+ this.brokerDes = brokerDes;
+ }
+
+ public long getBootTimestamp() {
+ return bootTimestamp;
+ }
+
+ public void setBootTimestamp(long bootTimestamp) {
+ this.bootTimestamp = bootTimestamp;
+ }
+
+ public int getBrokerVersion() {
+ return brokerVersion;
+ }
+
+ public void setBrokerVersion(int brokerVersion) {
+ this.brokerVersion = brokerVersion;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof BrokerRuntimeMetric)) {
+ return false;
+ }
+ BrokerRuntimeMetric other = (BrokerRuntimeMetric) obj;
+
+ return other.clusterName.equals(clusterName) &&
+ other.brokerAddress.equals(brokerAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + brokerAddress.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterName: " + clusterName + " brokerAddress: " + brokerAddress + " brokerHost: " + brokerHost;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java b/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
deleted file mode 100644
index 50f5b0a..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.exporter.service;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-
-import javax.annotation.Resource;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-
-public abstract class AbstractCommonService {
- @Resource
- protected MQAdminExt mqAdminExt;
- protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable,
- List<String> clusterNameList, List<String> brokerNameList) {
- Set<String> finalBrokerNameList = Sets.newHashSet();
- if (CollectionUtils.isNotEmpty(clusterNameList)) {
- try {
- for (String clusterName : clusterNameList) {
- finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
- }
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- if (CollectionUtils.isNotEmpty(brokerNameList)) {
- finalBrokerNameList.addAll(brokerNameList);
- }
- return finalBrokerNameList;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java b/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
index c1f8802..5e2c0d4 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.exporter.service;
import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
-
import java.io.IOException;
import java.io.StringWriter;
-public interface RMQMetricsService {
+public interface RMQMetricsService {
RMQMetricsCollector getCollector();
- void Metrics(StringWriter writer) throws IOException;
+
+ void metrics(StringWriter writer) throws IOException;
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
index 17ec7d5..f24490c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
@@ -18,10 +18,12 @@ package org.apache.rocketmq.exporter.service.client;
import com.google.common.base.Throwables;
import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
@@ -55,11 +57,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
@@ -70,8 +75,21 @@ import java.util.Set;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
-@Service
+@Service("mqAdminExtImpl")
public class MQAdminExtImpl implements MQAdminExt {
+ @Autowired
+ @Qualifier("defaultMQAdminExt")
+ private DefaultMQAdminExt defaultMQAdminExt;
+
+ @Autowired
+ private DefaultMQPullConsumer pullConsumer;
+
+ @Autowired
+ private RemotingClient remotingClient;
+
+ @Autowired
+ private MQClientInstance mqClientInstance;
+
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
public MQAdminExtImpl() {
@@ -79,37 +97,35 @@ public class MQAdminExtImpl implements MQAdminExt {
public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
- return MQAdminInstance.threadLocalMQPullConsumer().pull(mq, "*", offset, 1);
+ return pullConsumer.pull(mq, "*", offset, 1);
}
@Override
public void updateBrokerConfig(String brokerAddr, Properties properties)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- UnsupportedEncodingException, InterruptedException, MQBrokerException {
- MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
}
@Override
public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config);
}
@Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
- RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
- }
- catch (Exception err) {
+ } catch (Exception err) {
throw Throwables.propagate(err);
}
assert response != null;
@@ -125,13 +141,11 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
- RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
- }
- catch (Exception err) {
+ } catch (Exception err) {
throw Throwables.propagate(err);
}
switch (response.getCode()) {
@@ -146,238 +160,238 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public TopicStatsTable examineTopicStats(String topic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.examineTopicStats(topic);
}
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
- TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
+ TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
return topicList;
}
@Override
public KVTable fetchBrokerRuntimeStats(String brokerAddr)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.examineConsumeStats(consumerGroup);
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
}
@Override
public ClusterInfo examineBrokerClusterInfo()
- throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException {
- return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
+ throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException {
+ return defaultMQAdminExt.examineBrokerClusterInfo();
}
@Override
public TopicRouteData examineTopicRouteInfo(String topic)
- throws RemotingException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
+ throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.examineTopicRouteInfo(topic);
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException, RemotingException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
+ throws
+ InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ return defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
@Override
public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.examineProducerConnectionInfo(producerGroup, topic);
}
@Override
public List<String> getNameServerAddressList() {
- return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
+ return defaultMQAdminExt.getNameServerAddressList();
}
@Override
public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
- throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName);
+ throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQClientException {
+ return defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName);
}
@Override
public void putKVConfig(String namespace, String key, String value) {
- MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value);
+ defaultMQAdminExt.putKVConfig(namespace, key, value);
}
@Override
public String getKVConfig(String namespace, String key)
- throws RemotingException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key);
+ throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.getKVConfig(namespace, key);
}
@Override
public KVTable getKVListByNamespace(String namespace)
- throws RemotingException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
+ throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.getKVListByNamespace(namespace);
}
@Override
public void deleteTopicInBroker(Set<String> addrs, String topic)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
- MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic);
+ defaultMQAdminExt.deleteTopicInBroker(addrs, topic);
}
@Override
public void deleteTopicInNameServer(Set<String> addrs, String topic)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.deleteTopicInNameServer(addrs, topic);
}
@Override
public void deleteSubscriptionGroup(String addr, String groupName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName);
}
@Override
public void createAndUpdateKvConfig(String namespace, String key, String value)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value);
}
@Override
public void deleteKvConfig(String namespace, String key)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.deleteKvConfig(namespace, key);
}
@Override
public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
- boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+ boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
}
@Override
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
- boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
+ boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, isForce);
}
@Override
public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.resetOffsetNew(consumerGroup, topic, timestamp);
}
@Override
public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
- String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr);
+ String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExt.getConsumeStatus(topic, group, clientAddr);
}
@Override
public void createOrUpdateOrderConf(String key, String value, boolean isCluster)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExt.createOrUpdateOrderConf(key, value, isCluster);
}
@Override
public GroupList queryTopicConsumeByWho(String topic)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException, RemotingException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
+ throws
+ InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ return defaultMQAdminExt.queryTopicConsumeByWho(topic);
}
@Override
public boolean cleanExpiredConsumerQueue(String cluster)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
- InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+ InterruptedException {
+ return defaultMQAdminExt.cleanExpiredConsumerQueue(cluster);
}
@Override
public boolean cleanExpiredConsumerQueueByAddr(String addr)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
- InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+ InterruptedException {
+ return defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(addr);
}
@Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
- throws RemotingException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
+ throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
}
@Override
public List<MessageTrack> messageTrackDetail(MessageExt msg)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.messageTrackDetail(msg);
}
@Override
public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ defaultMQAdminExt.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
}
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum);
+ defaultMQAdminExt.createTopic(key, newTopic, queueNum);
}
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
- throws MQClientException {
- MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag);
+ throws MQClientException {
+ defaultMQAdminExt.createTopic(key, newTopic, queueNum, topicSysFlag);
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp);
+ return defaultMQAdminExt.searchOffset(mq, timestamp);
}
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
+ return defaultMQAdminExt.maxOffset(mq);
}
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
+ return defaultMQAdminExt.minOffset(mq);
}
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
+ return defaultMQAdminExt.earliestMsgStoreTime(mq);
}
@Override
public MessageExt viewMessage(String msgId)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExt.viewMessage(msgId);
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end);
+ throws MQClientException, InterruptedException {
+ return defaultMQAdminExt.queryMessage(topic, key, maxNum, begin, end);
}
@Override
@@ -396,8 +410,8 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
- String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
- return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
+ String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ return defaultMQAdminExt.queryConsumeTimeSpan(topic, group);
}
//MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
@@ -406,110 +420,111 @@ public class MQAdminExtImpl implements MQAdminExt {
//https://github.com/apache/incubator-rocketmq/pull/69
@Override
public MessageExt viewMessage(String topic,
- String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
try {
return viewMessage(msgId);
+ } catch (Exception e) {
}
- catch (Exception e) {
- }
- MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
+ MQAdminImpl mqAdminImpl = mqClientInstance.getMQAdminImpl();
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
- MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
+ MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
- }
- else {
+ } else {
return null;
}
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
}
@Override
public Properties getBrokerConfig(
- String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
+ String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExt.getBrokerConfig(brokerAddr);
}
@Override
public TopicList fetchTopicsByCLuster(
- String clusterName) throws RemotingException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
+ String clusterName) throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.fetchTopicsByCLuster(clusterName);
}
@Override
public boolean cleanUnusedTopic(
- String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
+ String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.cleanUnusedTopic(cluster);
}
@Override
public boolean cleanUnusedTopicByAddr(
- String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
+ String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.cleanUnusedTopicByAddr(addr);
}
@Override
public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
- String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey);
+ String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.viewBrokerStatsData(brokerAddr, statsName, statsKey);
}
@Override
public Set<String> getClusterList(
- String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
+ String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.getClusterList(topic);
}
@Override
public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
- long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
- return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+ long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
}
@Override
public Set<String> getTopicClusterList(
- String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
- return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
+ String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+ return defaultMQAdminExt.getTopicClusterList(topic);
}
@Override
public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
- long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return defaultMQAdminExt.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
- long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis);
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return defaultMQAdminExt.getAllTopicGroup(brokerAddr, timeoutMillis);
}
@Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
- long offset) throws RemotingException, InterruptedException, MQBrokerException {
- MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+ long offset) throws RemotingException, InterruptedException, MQBrokerException {
+ defaultMQAdminExt.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
}
// 4.0.0 added
- @Override public void updateNameServerConfig(Properties properties,
- List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
+ @Override
+ public void updateNameServerConfig(Properties properties,
+ List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
}
- @Override public Map<String, Properties> getNameServerConfig(
- List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
+ @Override
+ public Map<String, Properties> getNameServerConfig(
+ List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
return null;
}
- @Override public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
- int queueId, long index, int count,
- String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ @Override
+ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
+ int queueId, long index, int count,
+ String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return null;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
index 6994c23..1b9b3df 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
@@ -16,104 +16,72 @@
*/
package org.apache.rocketmq.exporter.service.client;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.exporter.config.RMQConfigure;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Service;
import static org.apache.rocketmq.common.MixAll.TOOLS_CONSUMER_GROUP;
-
+@Service
public class MQAdminInstance {
-
- private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
-
- private static final ThreadLocal<DefaultMQPullConsumer> MQ_PULL_CONSUMER_THREAD_LOCAL = new ThreadLocal<DefaultMQPullConsumer>();
-
- private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
-
- public static MQAdminExt threadLocalMQAdminExt() {
- DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
- if (defaultMQAdminExt == null) {
- throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
+ private final static Logger log = LoggerFactory.getLogger(MQAdminInstance.class);
+ @Autowired
+ private RMQConfigure configure;
+
+ @Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
+ private DefaultMQAdminExt buildDefaultMQAdminExt() {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(5000L);
+ defaultMQAdminExt.setInstanceName("admin-" + System.currentTimeMillis());
+ try {
+ defaultMQAdminExt.start();
+ } catch (MQClientException ex) {
+ log.error(String.format("init default admin error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
}
return defaultMQAdminExt;
}
-
- public static DefaultMQPullConsumer threadLocalMQPullConsumer() {
- DefaultMQPullConsumer pullConsumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
- if (pullConsumer == null) {
- throw new IllegalStateException("pullConsumer should be init before you get this");
+ @Bean(destroyMethod = "shutdown")
+ private DefaultMQPullConsumer buildPullConsumer() throws Exception {
+ String namesrvAddress = configure.getNamesrvAddr();
+ if (StringUtils.isBlank(namesrvAddress)) {
+ log.error("init default pull consumer error, namesrv is null");
+ throw new Exception("init default pull consumer error, namesrv is null", null);
}
- return pullConsumer;
- }
-
-
- public static RemotingClient threadLocalRemotingClient() {
- MQClientInstance mqClientInstance = threadLocalMqClientInstance();
- MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl");
- return Reflect.on(mQClientAPIImpl).get("remotingClient");
- }
-
- public static MQClientInstance threadLocalMqClientInstance() {
- DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
- return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
- }
-
- public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
- Integer nowCount = INIT_COUNTER.get();
- if (nowCount == null) {
- DefaultMQAdminExt defaultMQAdminExt;
- if (timeoutMillis > 0) {
- defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
- }
- else {
- defaultMQAdminExt = new DefaultMQAdminExt();
- }
- defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
- defaultMQAdminExt.start();
- MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
-
-
- DefaultMQPullConsumer pullConsumer;
- pullConsumer = new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP,null);
- pullConsumer.setInstanceName("consumer-" + Long.toString(System.currentTimeMillis()));
- pullConsumer.setNamesrvAddr(System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)));
+ DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP, null);
+ pullConsumer.setInstanceName("consumer-" + System.currentTimeMillis());
+ pullConsumer.setNamesrvAddr(namesrvAddress);
+ try {
pullConsumer.start();
pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
-
- MQ_PULL_CONSUMER_THREAD_LOCAL.set(pullConsumer);
- INIT_COUNTER.set(1);
- }
- else {
- INIT_COUNTER.set(nowCount + 1);
+ } catch (MQClientException ex) {
+ log.error(String.format("init default pull consumer error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
}
+ return pullConsumer;
+ }
+ @Bean(destroyMethod = "shutdown")
+ private MQClientInstance buildInstance(@Qualifier("defaultMQAdminExt") DefaultMQAdminExt defaultMQAdminExt) {
+ DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(defaultMQAdminExt).get("defaultMQAdminExtImpl");
+ return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
- public static void destroyMQAdminInstance() {
- Integer nowCount = INIT_COUNTER.get() - 1;
- if (nowCount > 0) {
- INIT_COUNTER.set(nowCount);
- return;
- }
- MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
- if (mqAdminExt != null) {
- DefaultMQPullConsumer consumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
- if (consumer != null) {
- consumer.shutdown();
- MQ_PULL_CONSUMER_THREAD_LOCAL.remove();
- }
- mqAdminExt.shutdown();
- MQ_ADMIN_EXT_THREAD_LOCAL.remove();
- INIT_COUNTER.remove();
- }
+ @Bean
+ private RemotingClient client(MQClientInstance instance) {
+ MQClientAPIImpl mQClientAPIImpl = Reflect.on(instance).get("mQClientAPIImpl");
+ return Reflect.on(mQClientAPIImpl).get("remotingClient");
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
index 5dd008b..6cf977f 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -19,25 +19,17 @@ package org.apache.rocketmq.exporter.service.impl;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
-import org.apache.rocketmq.exporter.service.AbstractCommonService;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.StringWriter;
@Service
-public class RMQMetricsServiceImpl extends AbstractCommonService implements RMQMetricsService {
-
- private Logger logger = LoggerFactory.getLogger(RMQMetricsServiceImpl.class);
-
- private CollectorRegistry registry = new CollectorRegistry();
-
+public class RMQMetricsServiceImpl implements RMQMetricsService {
+ private CollectorRegistry registry = new CollectorRegistry();
private final RMQMetricsCollector rmqMetricsCollector;
-
public RMQMetricsCollector getCollector() {
return rmqMetricsCollector;
}
@@ -46,8 +38,8 @@ public class RMQMetricsServiceImpl extends AbstractCommonService implements RMQM
rmqMetricsCollector = new RMQMetricsCollector();
rmqMetricsCollector.register(registry);
}
- public void Metrics(StringWriter writer) throws IOException {
+
+ public void metrics(StringWriter writer) throws IOException {
TextFormat.write004(writer, registry.metricFamilySamples());
- logger.info(writer.toString());
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index c6e6a5e..f1777ec 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -16,290 +16,483 @@
*/
package org.apache.rocketmq.exporter.task;
-import com.google.common.base.Throwables;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.PullStatus;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.exporter.config.RMQConfigure;
+import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.apache.rocketmq.exporter.service.client.MQAdminExtImpl;
import org.apache.rocketmq.exporter.util.Utils;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
import javax.annotation.Resource;
-import java.util.Date;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@Component
public class MetricsCollectTask {
-
@Resource
+ @Qualifier("mqAdminExtImpl")
private MQAdminExt mqAdminExt;
@Resource
private RMQConfigure rmqConfigure;
-
@Resource
- private RMQMetricsService metricsService;
-
+ private RMQMetricsService metricsService;
private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class);
- @Scheduled(cron = "15 0/1 * * * ?")
- @MultiMQAdminCmdMethod(timeoutMillis = 5000)
- public void collectOffset() {
+ @PostConstruct
+ public void init() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ log.info("MetricsCollectTask init starting....");
+ long start = System.currentTimeMillis();
+ ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ StringBuilder infoOut = new StringBuilder();
+ for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+ infoOut.append(String.format("cluster name= %s, broker name = %s%n", clusterName, clusterInfo.getClusterAddrTable().get(clusterName)));
+ }
+ for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) {
+ infoOut.append(String.format("broker name = %s,master broker address= %s%n", brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs().get(MixAll.MASTER_ID)));
+ }
+ log.info(infoOut.toString());
+ log.info(String.format("MetricsCollectTask init finished....cost:%d", System.currentTimeMillis() - start));
+ }
+
+ @Scheduled(cron = "${task.collectTopicOffset.cron}")
+ public void collectTopicOffset() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
- Date date = new Date();
+ log.info("topic offset collection task starting....");
+ long start = System.currentTimeMillis();
+ TopicList topicList = null;
try {
- TopicList topicList = mqAdminExt.fetchAllTopicList();
- Set<String> topicSet = topicList.getTopicList();
- for (String topic : topicSet) {
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
- continue;
+ topicList = mqAdminExt.fetchAllTopicList();
+ } catch (Exception ex) {
+ log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ return;
+ }
+ Set<String> topicSet = topicList != null ? topicList.getTopicList() : null;
+ if (topicSet == null || topicSet.isEmpty()) {
+ log.error(String.format("collectTopicOffset-the topic list is empty. the namesrv address is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ return;
+ }
+ for (String topic : topicSet) {
+ TopicStatsTable topicStats = null;
+ try {
+ topicStats = mqAdminExt.examineTopicStats(topic);
+ } catch (Exception ex) {
+ log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
+ topic,
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ continue;
+ }
+
+ Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
+
+ double totalMaxOffset = 0L;
+ long lastUpdateTimestamp = 0L;
+ StringBuilder sb = new StringBuilder();
+
+ for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
+ MessageQueue q = topicStatusEntry.getKey();
+ TopicOffset offset = topicStatusEntry.getValue();
+ totalMaxOffset += offset.getMaxOffset();
+ if (offset.getLastUpdateTimestamp() > lastUpdateTimestamp) {
+ lastUpdateTimestamp = offset.getLastUpdateTimestamp();
+ }
+ sb.append(q.getBrokerName()).append(" ");
+ }
+ metricsService.getCollector().addTopicOffsetMetric("", sb.toString(), topic, lastUpdateTimestamp, totalMaxOffset);
+ }
+ log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
+ }
+
+ @Scheduled(cron = "${task.collectConsumerOffset.cron}")
+ public void collectConsumerOffset() {
+ if (!rmqConfigure.isEnableCollect()) {
+ return;
+ }
+ log.info("consumer offset collection task starting....");
+ long start = System.currentTimeMillis();
+ TopicList topicList = null;
+ try {
+ topicList = mqAdminExt.fetchAllTopicList();
+ } catch (Exception ex) {
+ log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
+ }
+
+
+ Set<String> topicSet = topicList.getTopicList();
+ for (String topic : topicSet) {
+ GroupList groupList = null;
+
+ boolean isDLQTopic = topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
+ if (isDLQTopic) {
+ continue;
+ }
+ try {
+ groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+ } catch (Exception ex) {
+ log.warn(String.format("collectConsumerOffset-topic's consumer is empty, %s", topic));
+ continue;
+ }
+
+ if (groupList == null || groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
+ log.warn(String.format("no any consumer for topic(%s), ignore this topic", topic));
+ continue;
+ }
+
+
+ for (String group : groupList.getGroupList()) {
+ ConsumeStats consumeStats = null;
+ ConsumerConnection onlineConsumers = null;
+ long diff = 0L, totalConsumerOffset = 0L, totalBrokerOffset = 0L;
+ int countOfOnlineConsumers = 0;
+
+ double consumeTPS = 0F;
+ try {
+ onlineConsumers = mqAdminExt.examineConsumerConnectionInfo(group);
+ } catch (InterruptedException | RemotingException ex) {
+ log.error(String.format("get topic's(%s) online consumers(%s) exception", topic, group), ex);
+ } catch (MQClientException ex) {
+ handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+ } catch (MQBrokerException ex) {
+ handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
}
- String clusterName = null;
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
- Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
- for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
- clusterName = clusterEntry.getValue().getCluster();
- break;
+ if (onlineConsumers == null || onlineConsumers.getConnectionSet() == null || onlineConsumers.getConnectionSet().isEmpty()) {
+ log.warn(String.format("no any consumer online. topic=%s, consumer group=%s. ignore this", topic, group));
+ countOfOnlineConsumers = 0;
+ } else {
+ countOfOnlineConsumers = onlineConsumers.getConnectionSet().size();
}
- if (clusterName != null) {
- HashMap<String,Long> brokerOffsetMap = new HashMap<>();
- TopicStatsTable topicStatus = mqAdminExt.examineTopicStats(topic);
- Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStatus.getOffsetTable().entrySet();
- for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
- MessageQueue q = topicStatusEntry.getKey();
- TopicOffset offset = topicStatusEntry.getValue();
- if (brokerOffsetMap.containsKey(q.getBrokerName())) {
- brokerOffsetMap.put(q.getBrokerName(),brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
- }
- else {
- brokerOffsetMap.put(q.getBrokerName(),offset.getMaxOffset());
- }
- }
- Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
- for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
- metricsService.getCollector().AddTopicOffsetMetric(clusterName,brokerOffsetEntry.getKey(), topic, brokerOffsetEntry.getValue());
- }
+ try {
+ consumeStats = mqAdminExt.examineConsumeStats(group, topic);
+ } catch (InterruptedException | RemotingException ex) {
+ log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
+ } catch (MQClientException ex) {
+ handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+ } catch (MQBrokerException ex) {
+ handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+ }
+ if (consumeStats == null || consumeStats.getOffsetTable() == null || consumeStats.getOffsetTable().isEmpty()) {
+ log.warn(String.format("no any offset for consumer(%s), topic(%s), ignore this", group, topic));
+ continue;
+ }
+ {
+ diff = consumeStats.computeTotalDiff();
+ consumeTPS = consumeStats.getConsumeTps();
+ metricsService.getCollector().addGroupDiffMetric(String.valueOf(countOfOnlineConsumers), group, topic, diff);
+ metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
}
+ Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStats.getOffsetTable().entrySet();
+ for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
+ MessageQueue q = consumeStatusEntry.getKey();
+ OffsetWrapper offset = consumeStatusEntry.getValue();
- HashMap<String,Long> consumeOffsetMap = new HashMap<>();
- GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
- if (groupList != null && !groupList.getGroupList().isEmpty()) {
- for (String group : groupList.getGroupList()) {
- try {
- ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
- Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
- for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
- MessageQueue q = consumeStatusEntry.getKey();
- OffsetWrapper offset = consumeStatusEntry.getValue();
- if (consumeOffsetMap.containsKey(q.getBrokerName())) {
- consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
- } else {
- consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
- }
- }
- } catch (Exception e) {
- log.info("ignore this consumer", e.getMessage());
- }
- Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet();
- for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries) {
- metricsService.getCollector().AddGroupOffsetMetric(clusterName,consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
- }
- consumeOffsetMap.clear();
- }
+ //topic + consumer group 生产offset
+ totalBrokerOffset += totalBrokerOffset + offset.getBrokerOffset();
+ //topic + consumer group 消费offset
+ totalConsumerOffset += offset.getConsumerOffset();
}
+ metricsService.getCollector().addGroupBrokerTotalOffsetMetric(topic, group, totalBrokerOffset);
+ metricsService.getCollector().addGroupConsumerTotalOffsetMetric(topic, group, totalBrokerOffset);
}
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
}
+ log.info("consumer offset collection task finished...." + (System.currentTimeMillis() - start));
}
- @Scheduled(cron = "15 0/1 * * * ?")
- @MultiMQAdminCmdMethod(timeoutMillis = 5000)
- public void collectTopic() {
+ @Scheduled(cron = "${task.collectBrokerStatsTopic.cron}")
+ public void collectBrokerStatsTopic() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
- Date date = new Date();
+ log.info("broker topic stats collection task starting....");
+ long start = System.currentTimeMillis();
+ Set<String> topicSet = null;
try {
TopicList topicList = mqAdminExt.fetchAllTopicList();
- Set<String> topicSet = topicList.getTopicList();
- for (String topic : topicSet) {
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
- continue;
+ topicSet = topicList.getTopicList();
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
+ }
+ if (topicSet == null || topicSet.isEmpty()) {
+ return;
+ }
+ ClusterInfo clusterInfo = null;
+ try {
+ clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
+ }
+
+ for (String topic : topicSet) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ continue;
+ }
+ TopicRouteData topicRouteData = null;
+
+ try {
+ topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
+ } catch (Exception ex) {
+ log.error(String.format("fetch topic route error. ignore %s", topic), ex);
+ continue;
+ }
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (!StringUtils.isBlank(masterAddr)) {
+ BrokerStatsData bsd = null;
+ try {
+ //topic发了多少条消息
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
+ String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
+ metricsService.getCollector().addTopicPutNumsMetric(
+ bd.getCluster(),
+ bd.getBrokerName(),
+ brokerIP,
+ "",
+ topic,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ );
+ } catch (MQClientException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+ }
+ } catch (RemotingTimeoutException | InterruptedException | RemotingSendRequestException | RemotingConnectException ex1) {
+ log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex1);
+ }
+ try {
+ //topic总共发了多少字节
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
+ String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
+ metricsService.getCollector().addTopicPutSizeMetric(
+ bd.getCluster(),
+ bd.getBrokerName(),
+ brokerIP,
+ "",
+ topic,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ );
+ } catch (MQClientException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+ }
+ } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+ log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+ }
}
- TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
- GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+ }
+ GroupList groupList = null;
+ try {
+ groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerStatsTopic-fetch consumers for topic(%s) error, ignore this topic", topic), ex);
+ return;
+ }
+ if (groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
+ log.warn(String.format("collectBrokerStatsTopic-topic's consumer is empty, %s", topic));
+ return;
+ }
+ for (String group : groupList.getGroupList())
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
+ String statsKey = String.format("%s@%s", topic, group);
+ BrokerStatsData bsd = null;
try {
- BrokerStatsData bsd = null;
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
- metricsService.getCollector().AddTopicPutNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- }
- catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
- metricsService.getCollector().AddTopicPutSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ //消费者消费了多少条消息
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
+ metricsService.getCollector().addGroupGetNumsMetric(
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ } catch (MQClientException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
}
- catch (Exception e) {
- log.info("error is " + e.getMessage());
+ } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+ log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
+ }
+ try {
+ //消费者消费了多少字节
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
+ metricsService.getCollector().addGroupGetSizeMetric(
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ } catch (MQClientException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
+ } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+ log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
- }
- }
- if (groupList != null && !groupList.getGroupList().isEmpty()) {
- for (String group : groupList.getGroupList()) {
- for (BrokerData bd : topicRouteData.getBrokerDatas()) {
- String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
- if (masterAddr != null) {
- try {
- String statsKey = String.format("%s@%s", topic, group);
- BrokerStatsData bsd = null;
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
- metricsService.getCollector().AddGroupGetNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
- metricsService.getCollector().AddGroupGetSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- try {
-
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
- metricsService.getCollector().AddsendBackNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- try {
- collectLatencyMetrcisInner(topic, group, masterAddr, bd);
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
+ try {
+ //消费者重新消费topic的次数
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
+ metricsService.getCollector().addSendBackNumsMetric(
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ } catch (MQClientException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
+ } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+ log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
}
}
}
- }
- }
- catch (Exception err) {
- throw Throwables.propagate(err);
}
+ log.info("broker topic stats collection task finished...." + (System.currentTimeMillis() - start));
}
- @Scheduled(cron = "15 0/1 * * * ?")
- @MultiMQAdminCmdMethod(timeoutMillis = 5000)
- public void collectBroker() {
+ @Scheduled(cron = "${task.collectBrokerStats.cron}")
+ public void collectBrokerStats() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
+ log.info("broker stats collection task starting....");
+ long start = System.currentTimeMillis();
+ ClusterInfo clusterInfo = null;
try {
- Date date = new Date();
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
- Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
- for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
- String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
- if (masterAddr != null) {
- try {
- BrokerStatsData bsd = null;
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS,clusterEntry.getValue().getCluster());
- metricsService.getCollector().AddBrokerPutNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- }
- catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
- metricsService.getCollector().AddBrokerGetNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- }
- catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
- }
- }
- }
- }
- catch (Exception err) {
- throw Throwables.propagate(err);
+ clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerStats-get cluster info from namesrv error. address is %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
}
- }
- private void collectLatencyMetrcisInner(String topic,String group,String masterAddr, BrokerData bd) throws Exception {
- long maxLagTime = 0;
- String statsKey;
- BrokerStatsData bsd = null;
- ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
- Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
- for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
- MessageQueue q = consumeStatusEntry.getKey();
- OffsetWrapper offset = consumeStatusEntry.getValue();
- int queueId = q.getQueueId();
- statsKey = String.format("%d@%s@%s", queueId, topic, group);
+
+ Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+ for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+ String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (StringUtils.isBlank(masterAddr)) {
+ continue;
+ }
+ BrokerStatsData bsd = null;
try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_LATENCY, statsKey);
- metricsService.getCollector().AddGroupGetLatencyMetric(bd.getCluster(), bd.getBrokerName(), topic, group, String.format("%d", queueId), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
- } catch (Exception e) {
- log.info("error is " + e.getMessage());
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
+ String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ metricsService.getCollector().addBrokerPutNumsMetric(
+ clusterEntry.getValue().getCluster(),
+ brokerIP,
+ "",
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ } catch (Exception ex) {
+ log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
}
- MQAdminExtImpl mqAdminImpl = (MQAdminExtImpl) mqAdminExt;
- PullResult consumePullResult = mqAdminImpl.queryMsgByOffset(q, offset.getConsumerOffset());
- long lagTime = 0;
- if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
- lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
- if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
- lagTime = 0;
- }
- } else if (consumePullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
- lagTime = 0;
- } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
- PullResult pullResult = mqAdminImpl.queryMsgByOffset(q, consumePullResult.getMinOffset());
- if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
- lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+ try {
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
+ String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ metricsService.getCollector().addBrokerGetNumsMetric(
+ clusterEntry.getValue().getCluster(),
+ brokerIP,
+ "",
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ } catch (Exception ex) {
+ log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
+ }
+ }
+ log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
+ }
+
+ @Scheduled(cron = "${task.collectBrokerRuntimeStats.cron}")
+ public void collectBrokerRuntimeStats() {
+ if (!rmqConfigure.isEnableCollect()) {
+ return;
+ }
+ log.info("broker runtime stats collection task starting....");
+ long start = System.currentTimeMillis();
+ ClusterInfo clusterInfo = null;
+ try {
+ clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerRuntimeStats-get cluster info from namesrv error. address is %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
+ }
+
+ Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+ for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+ String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ String clusterName = clusterEntry.getValue().getCluster();
+
+ KVTable kvTable = null;
+ if (!StringUtils.isBlank(masterAddr)) {
+ try {
+ kvTable = mqAdminExt.fetchBrokerRuntimeStats(masterAddr);
+ } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException | InterruptedException ex) {
+ log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s", masterAddr), ex);
+ } catch (MQBrokerException ex) {
+ if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+ log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s, error=%s", masterAddr, ex.getErrorMessage()));
+ } else {
+ log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s", masterAddr), ex);
+ }
}
- } else {
- lagTime = 0;
}
- if (lagTime > maxLagTime) {
- maxLagTime = lagTime;
+ if (kvTable == null || kvTable.getTable() == null || kvTable.getTable().isEmpty()) {
+ continue;
}
+ try {
+ BrokerRuntimeStats brokerRuntimeStats = new BrokerRuntimeStats(kvTable);
+ metricsService.getCollector().addBrokerRuntimeStatsMetric(brokerRuntimeStats, clusterName, masterAddr, "");
+ } catch (Exception ex) {
+ log.error(String.format("collectBrokerRuntimeStats-parse or report broker runtime stats error, %s", JSON.toJSONString(kvTable)), ex);
+ }
+
+ }
+
+ log.info("broker runtime stats collection task finished...." + (System.currentTimeMillis() - start));
+ }
+
+ private void handleTopicNotExistException(int responseCode, Exception ex, String topic, String group) {
+ if (responseCode == ResponseCode.TOPIC_NOT_EXIST || responseCode == ResponseCode.CONSUMER_NOT_ONLINE) {
+ log.error(String.format("get topic's(%s) consumer-stats(%s) exception, detail: %s", topic, group, ex.getMessage()));
+ } else {
+ log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
}
- metricsService.getCollector().AddGroupGetLatencyByStoreTimeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, maxLagTime);
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java b/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
index 29317f9..1cb147c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
+++ b/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
@@ -55,8 +55,7 @@ public class JsonUtil {
public static void writeValue(Writer writer, Object obj) {
try {
objectMapper.writeValue(writer, obj);
- }
- catch (IOException e) {
+ } catch (IOException e) {
Throwables.propagateIfPossible(e);
}
}
@@ -67,9 +66,8 @@ public class JsonUtil {
}
try {
- return src instanceof String ? (String)src : objectMapper.writeValueAsString(src);
- }
- catch (Exception e) {
+ return src instanceof String ? (String) src : objectMapper.writeValueAsString(src);
+ } catch (Exception e) {
logger.error("Parse Object to String error src=" + src, e);
return null;
}
@@ -81,9 +79,8 @@ public class JsonUtil {
}
try {
- return src instanceof byte[] ? (byte[])src : objectMapper.writeValueAsBytes(src);
- }
- catch (Exception e) {
+ return src instanceof byte[] ? (byte[]) src : objectMapper.writeValueAsBytes(src);
+ } catch (Exception e) {
logger.error("Parse Object to byte[] error", e);
return null;
}
@@ -95,9 +92,8 @@ public class JsonUtil {
}
str = escapesSpecialChar(str);
try {
- return clazz.equals(String.class) ? (T)str : objectMapper.readValue(str, clazz);
- }
- catch (Exception e) {
+ return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
+ } catch (Exception e) {
logger.error("Parse String to Object error\nString: {}\nClass<T>: {}\nError: {}", str, clazz.getName(), e);
return null;
}
@@ -108,9 +104,8 @@ public class JsonUtil {
return null;
}
try {
- return clazz.equals(byte[].class) ? (T)bytes : objectMapper.readValue(bytes, clazz);
- }
- catch (Exception e) {
+ return clazz.equals(byte[].class) ? (T) bytes : objectMapper.readValue(bytes, clazz);
+ } catch (Exception e) {
logger.error("Parse byte[] to Object error\nbyte[]: {}\nClass<T>: {}\nError: {}", bytes, clazz.getName(), e);
return null;
}
@@ -122,11 +117,10 @@ public class JsonUtil {
}
str = escapesSpecialChar(str);
try {
- return (T)(typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
- }
- catch (Exception e) {
+ return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
+ } catch (Exception e) {
logger.error("Parse String to Object error\nString: {}\nTypeReference<T>: {}\nError: {}", str,
- typeReference.getType(), e);
+ typeReference.getType(), e);
return null;
}
}
@@ -136,12 +130,11 @@ public class JsonUtil {
return null;
}
try {
- return (T)(typeReference.getType().equals(byte[].class) ? bytes : objectMapper.readValue(bytes,
- typeReference));
- }
- catch (Exception e) {
+ return (T) (typeReference.getType().equals(byte[].class) ? bytes : objectMapper.readValue(bytes,
+ typeReference));
+ } catch (Exception e) {
logger.error("Parse byte[] to Object error\nbyte[]: {}\nTypeReference<T>: {}\nError: {}", bytes,
- typeReference.getType(), e);
+ typeReference.getType(), e);
return null;
}
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index 083738e..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1,14 +0,0 @@
-server.port=5557
-
-spring.application.name=rocketmq-exporter
-spring.http.encoding.charset=UTF-8
-spring.http.encoding.enabled=true
-spring.http.encoding.force=true
-logging.config=classpath:logback.xml
-#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR
-rocketmq.config.namesrvAddr=127.0.0.1:9876
-
-
-rocketmq.config.enableCollect=true
-rocketmq.config.webTelemetryPath=/metrics
-rocketmq.config.rocketmqVersion=4_3_2
\ No newline at end of file
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..8ff9e07
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,32 @@
+server:
+ port: 5557
+spring:
+ application:
+ name: rocketmq-exporter
+ http:
+ encoding:
+ charset: UTF-8
+ enabled: true
+ force: true
+logging:
+ config: classpath:logback.xml
+
+rocketmq:
+ config:
+ webTelemetryPath: /metrics
+ rocketmqVersion: 4_2_0
+ namesrvAddr: 127.0.0.1:9876 #
+ enableCollect: true
+
+task:
+ count: 5
+ collectTopicOffset:
+ cron: 15 0/1 * * * ?
+ collectConsumerOffset:
+ cron: 15 0/1 * * * ?
+ collectBrokerStatsTopic:
+ cron: 15 0/1 * * * ?
+ collectBrokerStats:
+ cron: 15 0/1 * * * ?
+ collectBrokerRuntimeStats:
+ cron: 15 0/1 * * * ?
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index b5291d3..8032516 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -1,33 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder charset="UTF-8">
- <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
- </encoder>
- </appender>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder charset="UTF-8">
+ <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+ </encoder>
+ </appender>
- <appender name="FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${user.home}/logs/exporterlogs/rocketmq-exporter.log</file>
- <append>true</append>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${user.home}/logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
- </fileNamePattern>
- <timeBasedFileNamingAndTriggeringPolicy
- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>104857600</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- <MaxHistory>10</MaxHistory>
- </rollingPolicy>
- <encoder>
- <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
- <charset class="java.nio.charset.Charset">UTF-8</charset>
- </encoder>
- </appender>
+ <appender name="FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>./logs/exporterlogs/rocketmq-exporter.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>./logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
+ </fileNamePattern>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>104857600</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ <totalSizeCap>20gb</totalSizeCap>
+ </rollingPolicy>
+ <encoder>
+ <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
- <root level="INFO">
- <appender-ref ref="STDOUT" />
- <appender-ref ref="FILE" />
- </root>
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="FILE"/>
+ </root>
-</configuration>
\ No newline at end of file
+</configuration>