You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by wl...@apache.org on 2020/03/08 13:16:59 UTC

[rocketmq-exporter] 39/43: add all metrics

This is an automated email from the ASF dual-hosted git repository.

wlliqipeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git

commit f0e1125a8ca54e65edf299aab66c38e18564355b
Author: liwei5 <li...@vipkid.com.cn>
AuthorDate: Thu Dec 12 00:17:32 2019 +0800

    add all metrics
---
 pom.xml                                            |   10 +-
 .../exporter/aspect/admin/MQAdminAspect.java       |   71 --
 .../admin/annotation/MultiMQAdminCmdMethod.java    |   30 -
 .../exporter/collector/RMQMetricsCollector.java    | 1178 ++++++++++++++++++--
 .../rocketmq/exporter/config/RMQConfigure.java     |    4 +-
 .../rocketmq/exporter/config/ScheduleConfig.java   |   26 +
 .../exporter/controller/RMQMetricsController.java  |    9 +-
 .../exporter/exception/ServiceException.java       |   31 -
 .../exporter/model/BrokerRuntimeStats.java         |  609 ++++++++++
 .../exporter/model/metrics/BrokerMetric.java       |   43 +-
 .../model/metrics/ConsumerCountMetric.java         |   58 +
 .../exporter/model/metrics/ConsumerMetric.java     |   45 +-
 .../model/metrics/ConsumerQueueMetric.java         |   93 --
 .../model/metrics/ConsumerTopicDiffMetric.java     |   61 +
 .../model/metrics/DLQTopicOffsetMetric.java        |   72 ++
 .../exporter/model/metrics/ProducerMetric.java     |   61 +-
 .../exporter/model/metrics/TopicPutNumMetric.java  |   83 ++
 .../metrics/brokerruntime/BrokerRuntimeMetric.java |   91 ++
 .../exporter/service/AbstractCommonService.java    |   50 -
 .../exporter/service/RMQMetricsService.java        |    6 +-
 .../exporter/service/client/MQAdminExtImpl.java    |  279 ++---
 .../exporter/service/client/MQAdminInstance.java   |  118 +-
 .../service/impl/RMQMetricsServiceImpl.java        |   16 +-
 .../rocketmq/exporter/task/MetricsCollectTask.java |  603 ++++++----
 .../apache/rocketmq/exporter/util/JsonUtil.java    |   39 +-
 src/main/resources/application.properties          |   14 -
 src/main/resources/application.yml                 |   32 +
 src/main/resources/logback.xml                     |   55 +-
 28 files changed, 2849 insertions(+), 938 deletions(-)

diff --git a/pom.xml b/pom.xml
index cda549d..8a84c73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,15 @@
 					</resources>
 				</configuration>
 			</plugin>
-		</plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+        </plugins>
 
 	</build>
 
diff --git a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java b/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
deleted file mode 100644
index 6b5435f..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.exporter.aspect.admin;
-
-import java.lang.reflect.Method;
-import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
-import org.apache.rocketmq.exporter.service.client.MQAdminInstance;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.annotation.Pointcut;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-@Aspect
-@Service
-public class MQAdminAspect {
-    private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
-
-    public MQAdminAspect() {
-    }
-
-    @Pointcut("execution(* org.apache.rocketmq.exporter.service.client.MQAdminExtImpl..*(..))")
-    public void mQAdminMethodPointCut() {
-
-    }
-
-    @Pointcut("@annotation(org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod)")
-    public void multiMQAdminMethodPointCut() {
-
-    }
-
-    @Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()")
-    public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
-        long start = System.currentTimeMillis();
-        Object obj;
-        try {
-            MethodSignature signature = (MethodSignature)joinPoint.getSignature();
-            Method method = signature.getMethod();
-            MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
-            if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
-                MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
-            }
-            else {
-                MQAdminInstance.initMQAdminInstance(0);
-            }
-            obj = joinPoint.proceed();
-        }
-        finally {
-            MQAdminInstance.destroyMQAdminInstance();
-            logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
-        }
-        return obj;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java b/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
deleted file mode 100644
index 7953fb3..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.exporter.aspect.admin.annotation;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Target({ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface MultiMQAdminCmdMethod {
-    long timeoutMillis() default 0;
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index 2854712..253e9f9 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -17,12 +17,17 @@
 package org.apache.rocketmq.exporter.collector;
 
 import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
 import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerCountMetric;
 import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
-import org.apache.rocketmq.exporter.model.metrics.ConsumerQueueMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerTopicDiffMetric;
+import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
 import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
+import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
+import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,160 +36,1159 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class RMQMetricsCollector extends Collector {
+    //max offset of normal consume queue
+    private ConcurrentHashMap<ProducerMetric, Double> topicOffset = new ConcurrentHashMap<>();
+    //max offset of retry topic consume queue
+    private ConcurrentHashMap<ProducerMetric, Double> topicRetryOffset = new ConcurrentHashMap<>();
+    //max offset of dlq consume queue
+    private ConcurrentHashMap<DLQTopicOffsetMetric, Double> topicDLQOffset = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ProducerMetric, Double>   topicPutNums            = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<ProducerMetric, Double>   topicPutSize            = new ConcurrentHashMap<>();
+    //total put numbers for topics
+    private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutNums = new ConcurrentHashMap<>();
+    //total get numbers for topics
+    private ConcurrentHashMap<TopicPutNumMetric, Double> topicPutSize = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ProducerMetric, Double>   topicOffset              = new ConcurrentHashMap<>();
+    //diff for consumer group
+    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDiff = new ConcurrentHashMap<>();
+    //retry diff for consumer group
+    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerRetryDiff = new ConcurrentHashMap<>();
+    //死信堆积 todo 检查是否存在这个数据 应该不存在
+    private ConcurrentHashMap<ConsumerTopicDiffMetric, Long> consumerDLQDiff = new ConcurrentHashMap<>();
+    //consumer count
+    private ConcurrentHashMap<ConsumerCountMetric, Integer> consumerCounts = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<BrokerMetric, Double>     brokerPutNums           = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<BrokerMetric, Double>     brokerGetNums           = new ConcurrentHashMap<>();
+    //broker offset for consumer-topic
+    private ConcurrentHashMap<ConsumerMetric, Long> groupBrokerTotalOffset = new ConcurrentHashMap<>();
+    //consumer offset for consumer-topic
+    private ConcurrentHashMap<ConsumerMetric, Long> groupConsumeTotalOffset = new ConcurrentHashMap<>();
+    //consume tps
+    private ConcurrentHashMap<ConsumerMetric, Double> groupConsumeTPS = new ConcurrentHashMap<>();
+    //consumed message count for consumer-topic
+    private ConcurrentHashMap<ConsumerMetric, Double> groupGetNums = new ConcurrentHashMap<>();
+    //consumed message size(byte) for consumer-topic
+    private ConcurrentHashMap<ConsumerMetric, Double> groupGetSize = new ConcurrentHashMap<>();
+    //re-consumed message count for consumer-topic
+    private ConcurrentHashMap<ConsumerMetric, Double> sendBackNums = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ConsumerMetric, Double>   groupGetNums            = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<ConsumerMetric, Double>   groupGetSize            = new ConcurrentHashMap<>();
+    //total put message count for one broker
+    private ConcurrentHashMap<BrokerMetric, Double> brokerPutNums = new ConcurrentHashMap<>();
+    //total get message count for one broker
+    private ConcurrentHashMap<BrokerMetric, Double> brokerGetNums = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ConsumerMetric, Double>       sendBackNums        = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<ConsumerMetric, Double>       groupOffset         = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ConsumerQueueMetric, Double>  groupGetLatency     = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning = new ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<ConsumerMetric, Double>  groupGetLatencyByStoreTime     = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps600 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps60 = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimePutTps10 = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush = new ConcurrentHashMap<>();
+
+    private static List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers");
+
+    private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
+        family.addMetric(
+                Arrays.asList(
+                        entry.getKey().getGroup(),
+                        entry.getKey().getTopic(),
+                        entry.getKey().getCountOfOnlineConsumers()
+                ),
+                entry.getValue().doubleValue());
+    }
+
+    private static List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("group", "caddr", "localaddr");
+
+    private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.entrySet()) {
+            loadGroupDiffMetric(groupGetLatencyByConsumerDiff, entry);
+        }
+        mfs.add(groupGetLatencyByConsumerDiff);
+
+        GaugeMetricFamily groupGetLatencyByConsumerRetryDiff = new GaugeMetricFamily("rocketmq_group_retrydiff", "GroupRetryDiff", GROUP_DIFF_LABEL_NAMES);
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.entrySet()) {
+            loadGroupDiffMetric(groupGetLatencyByConsumerRetryDiff, entry);
+        }
+        mfs.add(groupGetLatencyByConsumerRetryDiff);
+
+        GaugeMetricFamily groupGetLatencyByConsumerDLQDiff = new GaugeMetricFamily("rocketmq_group_dlqdiff", "GroupDLQDiff", GROUP_DIFF_LABEL_NAMES);
+        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.entrySet()) {
+            loadGroupDiffMetric(groupGetLatencyByConsumerDLQDiff, entry);
+        }
+        mfs.add(groupGetLatencyByConsumerDLQDiff);
+
+        GaugeMetricFamily consumerCountsF = new GaugeMetricFamily("rocketmq_group_count", "GroupCount", GROUP_COUNT_LABEL_NAMES);
+        for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.entrySet()) {
+            consumerCountsF.addMetric(
+                    Arrays.asList(
+                            entry.getKey().getGroup(),
+                            entry.getKey().getCaddr(),
+                            entry.getKey().getLocaladdr()
+                    ),
+                    entry.getValue().doubleValue());
+        }
+        mfs.add(consumerCountsF);
+
+    }
+
+
+    private static List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
+            "cluster", "brokerNames", "topic", "lastUpdateTimestamp"
+    );
+
+    private static List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
+            "cluster", "brokerNames", "group", "lastUpdateTimestamp"
+    );
+
+    private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
+        family.addMetric(
+                Arrays.asList(
+                        entry.getKey().getClusterName(),
+                        entry.getKey().getBrokerNames(),
+                        entry.getKey().getTopicName(),
+                        String.valueOf(entry.getKey().getLastUpdateTimestamp())
+                ),
+                entry.getValue());
+    }
+
+    private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_topic_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
+        for (Map.Entry<ProducerMetric, Double> entry : topicOffset.entrySet()) {
+            loadTopicOffsetMetric(topicOffsetF, entry);
+        }
+        mfs.add(topicOffsetF);
+
+        GaugeMetricFamily topicRetryOffsetF = new GaugeMetricFamily("rocketmq_topic_retry_offset", "TopicRetryOffset", TOPIC_OFFSET_LABEL_NAMES);
+        for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.entrySet()) {
+            loadTopicOffsetMetric(topicRetryOffsetF, entry);
+        }
+        mfs.add(topicRetryOffsetF);
+
+        GaugeMetricFamily topicDLQOffsetF = new GaugeMetricFamily("rocketmq_topic_dlq_offset", "TopicRetryOffset", DLQ_TOPIC_OFFSET_LABEL_NAMES);
+        for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.entrySet()) {
+            topicDLQOffsetF.addMetric(
+                    Arrays.asList(
+                            entry.getKey().getClusterName(),
+                            entry.getKey().getBrokerNames(),
+                            entry.getKey().getGroup(),
+                            String.valueOf(entry.getKey().getLastUpdateTimestamp())
+                    ),
+                    entry.getValue());
+        }
+        mfs.add(topicDLQOffsetF);
+
+    }
 
     @Override
     public List<MetricFamilySamples> collect() {
 
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
 
-        GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", Arrays.asList("cluster","broker","topic"));
-        for (Map.Entry<ProducerMetric,Double> entry:topicPutNums.entrySet()) {
-            topicPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+        collectConsumerMetric(mfs);
+
+        collectTopicOffsetMetric(mfs);
+
+        collectTopicNums(mfs);
+
+        collectGroupNums(mfs);
+
+        collectBrokerNums(mfs);
+
+        collectBrokerRuntimeStats(mfs);
+
+        return mfs;
+    }
+
+    private static List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
+            "cluster", "broker", "topic", "group", "queueid"
+    );
+    private static List<String> GROUP_LATENCY_BY_STORETIME_LABEL_NAMES = Arrays.asList(
+            "topic", "group"
+    );
+
+    private static List<String> BROKER_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "brokerHost");
+
+    private static void loadBrokerNums(GaugeMetricFamily family, Map.Entry<BrokerMetric, Double> entry) {
+        family.addMetric(Arrays.asList(
+                entry.getKey().getClusterName(),
+                entry.getKey().getBrokerIP(),
+                entry.getKey().getBrokerHost()),
+                entry.getValue()
+        );
+    }
+
+    private void collectBrokerNums(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_put_nums", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
+        for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.entrySet()) {
+            loadBrokerNums(brokerPutNumsGauge, entry);
         }
-        mfs.add(topicPutNumsGauge);
+        mfs.add(brokerPutNumsGauge);
 
+        GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_get_nums", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
+        for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.entrySet()) {
+            loadBrokerNums(brokerGetNumsGauge, entry);
+        }
+        mfs.add(brokerGetNumsGauge);
+    }
+
+
+    private static List<String> GROUP_NUMS_LABEL_NAMES = Arrays.asList(
+            "topic", "group"
+    );
 
-        GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", Arrays.asList("cluster","broker","topic"));
-        for (Map.Entry<ProducerMetric, Double> entry: topicPutSize.entrySet()) {
-            topicPutSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+    private static <T extends Number> void loadGroupNumsMetric(GaugeMetricFamily family, Map.Entry<ConsumerMetric, T> entry) {
+        family.addMetric(Arrays.asList(
+                entry.getKey().getTopicName(),
+                entry.getKey().getConsumerGroupName()),
+                entry.getValue().doubleValue()
+        );
+    }
+
+    private void collectBrokerRuntimeStatsPutMessageDistributeTime(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily pmdt0 = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0ms", "PutMessageDistributeTimeMap0ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt0, entry);
         }
-        mfs.add(topicPutSizeGauge);
+        mfs.add(pmdt0);
 
+        GaugeMetricFamily pmdt0to10ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0to10ms", "PutMessageDistributeTimeMap0to10ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt0to10ms, entry);
+        }
+        mfs.add(pmdt0to10ms);
 
-        CounterMetricFamily topicOffsetGauge = new CounterMetricFamily("rocketmq_producer_offset", "TopicOffset", Arrays.asList("cluster","broker","topic"));
-        for (Map.Entry<ProducerMetric, Double> entry: topicOffset.entrySet()) {
-            topicOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+        GaugeMetricFamily pmdt10to50ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10to50ms", "PutMessageDistributeTimeMap10to50ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt10to50ms, entry);
         }
-        mfs.add(topicOffsetGauge);
+        mfs.add(pmdt10to50ms);
 
+        GaugeMetricFamily pmdt50to100ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_50to100ms", "PutMessageDistributeTimeMap50to100ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt50to100ms, entry);
+        }
+        mfs.add(pmdt50to100ms);
 
-        GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", Arrays.asList("cluster","broker"));
-        for (Map.Entry<BrokerMetric, Double> entry: brokerPutNums.entrySet()) {
-            brokerPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+        GaugeMetricFamily pmdt100to200ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_100to200ms", "PutMessageDistributeTimeMap100to200ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt100to200ms, entry);
         }
-        mfs.add(brokerPutNumsGauge);
+        mfs.add(pmdt100to200ms);
 
+        GaugeMetricFamily pmdt200to500ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_200to500ms", "PutMessageDistributeTimeMap200to500ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt200to500ms, entry);
+        }
+        mfs.add(pmdt200to500ms);
 
-        GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", Arrays.asList("cluster","broker"));
-        for (Map.Entry<BrokerMetric, Double> entry: brokerGetNums.entrySet()) {
-            brokerGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+        GaugeMetricFamily pmdt500to1s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_500to1s", "PutMessageDistributeTimeMap500to1s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt500to1s, entry);
         }
-        mfs.add(brokerGetNumsGauge);
+        mfs.add(pmdt500to1s);
 
+        GaugeMetricFamily pmdt1to2s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_1to2s", "PutMessageDistributeTimeMap1to2s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt1to2s, entry);
+        }
+        mfs.add(pmdt1to2s);
 
-        GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", Arrays.asList("cluster","broker","topic","group"));
-        for (Map.Entry<ConsumerMetric, Double> entry: groupGetNums.entrySet()) {
-            groupGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        GaugeMetricFamily pmdt2to3s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_2to3s", "PutMessageDistributeTimeMap2to3s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt2to3s, entry);
         }
+        mfs.add(pmdt2to3s);
 
+        GaugeMetricFamily pmdt3to4s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_3to4s", "PutMessageDistributeTimeMap3to4s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt3to4s, entry);
+        }
+        mfs.add(pmdt3to4s);
+
+        GaugeMetricFamily pmdt4to5s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_4to5s", "PutMessageDistributeTimeMap4to5s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt4to5s, entry);
+        }
+        mfs.add(pmdt4to5s);
+
+        GaugeMetricFamily pmdt5to10s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_5to10s", "PutMessageDistributeTimeMap5to10s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt5to10s, entry);
+        }
+        mfs.add(pmdt5to10s);
+
+        GaugeMetricFamily pmdt10stoMore = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10stomore", "PutMessageDistributeTimeMap10toMore", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.entrySet()) {
+            loadBrokerRuntimeStatsMetric(pmdt10stoMore, entry);
+        }
+        mfs.add(pmdt10stoMore);
+    }
+
+    private void collectGroupNums(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_group_get_nums", "GroupGetNums", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.entrySet()) {
+            loadGroupNumsMetric(groupGetNumsGauge, entry);
+        }
         mfs.add(groupGetNumsGauge);
 
+        GaugeMetricFamily groupConsumeTPSF = new GaugeMetricFamily("rocketmq_group_consume_tps", "GroupConsumeTPS", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.entrySet()) {
+            loadGroupNumsMetric(groupConsumeTPSF, entry);
+        }
+        mfs.add(groupConsumeTPSF);
 
-        GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_message_size", "GroupGetMessageSize", Arrays.asList("cluster","broker","topic","group"));
-        for (Map.Entry<ConsumerMetric, Double> entry: groupGetSize.entrySet()) {
-            groupGetSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        GaugeMetricFamily groupBrokerTotalOffsetF = new GaugeMetricFamily("rocketmq_group_broker_total_offset", "GroupBrokerTotalOffset", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+            loadGroupNumsMetric(groupBrokerTotalOffsetF, entry);
         }
-        mfs.add(groupGetSizeGauge);
+        mfs.add(groupBrokerTotalOffsetF);
 
-        CounterMetricFamily groupOffsetGauge = new CounterMetricFamily("rocketmq_consumer_offset", "GroupOffset", Arrays.asList("cluster","broker","topic","group"));
-        for (Map.Entry<ConsumerMetric, Double> entry: groupOffset.entrySet()) {
-            groupOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.entrySet()) {
+            loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
         }
-        mfs.add(groupOffsetGauge);
+        mfs.add(groupConsumeTotalOffsetF);
 
+        GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_group_get_messagesize", "GroupGetMessageSize", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.entrySet()) {
+            loadGroupNumsMetric(groupGetSizeGauge, entry);
+        }
+        mfs.add(groupGetSizeGauge);
 
-        GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", Arrays.asList("cluster","broker","topic","group"));
-        for (Map.Entry<ConsumerMetric, Double> entry: sendBackNums.entrySet()) {
-            sendBackNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", GROUP_NUMS_LABEL_NAMES);
+        for (Map.Entry<ConsumerMetric, Double> entry : sendBackNums.entrySet()) {
+            loadGroupNumsMetric(sendBackNumsGauge, entry);
         }
         mfs.add(sendBackNumsGauge);
+    }
 
+    private void collectTopicNums(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_topic_put_nums", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
+        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.entrySet()) {
+            loadTopicNumsMetric(topicPutNumsGauge, entry);
+        }
+        mfs.add(topicPutNumsGauge);
 
-        GaugeMetricFamily groupGetLatencyGauge = new GaugeMetricFamily("rocketmq_group_get_latency", "GroupGetLatency", Arrays.asList("cluster","broker","topic","group","queueid"));
-        for (Map.Entry<ConsumerQueueMetric, Double> entry: groupGetLatency.entrySet()) {
-            groupGetLatencyGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName(),entry.getKey().getQueueId()), entry.getValue());
+        GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_topic_put_messagesize", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
+        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.entrySet()) {
+            loadTopicNumsMetric(topicPutSizeGauge, entry);
         }
-        mfs.add(groupGetLatencyGauge);
+        mfs.add(topicPutSizeGauge);
+    }
 
-        GaugeMetricFamily groupGetLatencyByStoretimeGauge = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime", "GroupGetLatencyByStoreTime", Arrays.asList("cluster","broker","topic","group"));
-        for (Map.Entry<ConsumerMetric, Double> entry: groupGetLatencyByStoreTime.entrySet()) {
-            groupGetLatencyByStoretimeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+    private static List<String> TOPIC_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokers", "topic");
+
+    private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
+        family.addMetric(
+                Arrays.asList(
+                        entry.getKey().getClusterName(),
+                        entry.getKey().getBrokerNames(),
+                        entry.getKey().getTopicName()
+                ),
+                entry.getValue()
+        );
+    }
+
+    public void addTopicOffsetMetric(String clusterName, String brokerNames, String topic, long lastUpdateTimestamp, double value) {
+        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            topicRetryOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
+        } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+            topicDLQOffset.put(new DLQTopicOffsetMetric(clusterName, brokerNames, topic.replace(MixAll.DLQ_GROUP_TOPIC_PREFIX, ""), lastUpdateTimestamp), value);
+        } else {
+            topicOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
         }
-        mfs.add(groupGetLatencyByStoretimeGauge);
+    }
 
-        return mfs;
+    public void addGroupCountMetric(String group, String caddr, String localaddr, int count) {
+        this.consumerCounts.put(new ConsumerCountMetric(group, caddr, localaddr), count);
     }
-    public void AddTopicPutNumsMetric(String clusterName, String brokerName, String topic,  double value)
-    {
-        topicPutNums.put(new ProducerMetric(clusterName,brokerName,topic),value);
+
+    public void addGroupDiffMetric(String countOfOnlineConsumers, String group, String topic, long value) {
+        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            this.consumerRetryDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+        } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+            this.consumerDLQDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+        } else {
+            this.consumerDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers), value);
+        }
+    }
+
+    public void addTopicPutNumsMetric(String cluster, String brokerNames, String brokerIP, String brokerHost,
+                                      String topic, double value) {
+        topicPutNums.put(new TopicPutNumMetric(cluster, brokerNames, brokerIP, brokerHost, topic), value);
+    }
+
+    public void addTopicPutSizeMetric(String cluster, String brokerName, String brokerIP, String brokerHost,
+                                      String topic, double value) {
+        topicPutSize.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, brokerHost, topic), value);
+    }
+
+    public void addGroupBrokerTotalOffsetMetric(String topic, String group, long value) {
+        groupBrokerTotalOffset.put(new ConsumerMetric(topic, group), value);
+    }
+
+    public void addGroupConsumerTotalOffsetMetric(String topic, String group, long value) {
+        groupConsumeTotalOffset.put(new ConsumerMetric(topic, group), value);
     }
 
-    public void AddTopicPutSizeMetric(String clusterName, String brokerName, String topic,  double value)
-    {
-        topicPutSize.put(new ProducerMetric(clusterName,brokerName,topic),value);
+    public void addGroupConsumeTPSMetric(String topic, String group, double value) {
+        groupConsumeTPS.put(new ConsumerMetric(topic, group), value);
     }
 
-    public void AddTopicOffsetMetric(String clusterName, String brokerName, String topic,  double value)
-    {
-        topicOffset.put(new ProducerMetric(clusterName,brokerName,topic),value);
+    public void addGroupGetNumsMetric(String topic, String group, double value) {
+        groupGetNums.put(new ConsumerMetric(topic, group), value);
     }
 
-    public void AddBrokerPutNumsMetric(String clusterName, String brokerName,  double value)
-    {
-        brokerPutNums.put(new BrokerMetric(clusterName,brokerName),value);
+    public void addGroupGetSizeMetric(String topic, String group, double value) {
+        groupGetSize.put(new ConsumerMetric(topic, group), value);
     }
 
-    public void AddBrokerGetNumsMetric(String clusterName, String brokerName,  double value)
-    {
-        brokerGetNums.put(new BrokerMetric(clusterName,brokerName),value);
+    public void addSendBackNumsMetric(String topic, String group, double value) {
+        sendBackNums.put(new ConsumerMetric(topic, group), value);
     }
 
-    public void AddGroupGetNumsMetric(String clusterName, String brokerName, String topic, String group,  double value)
-    {
-        groupGetNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
+        brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
     }
 
-    public void AddGroupGetSizeMetric(String clusterName, String brokerName, String topic, String group,  double value)
-    {
-        groupGetSize.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
+        brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
     }
 
-    public void AddGroupOffsetMetric(String clusterName, String brokerName, String topic, String group,  double value)
-    {
-        groupOffset.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
+        addBrokerRuntimePutMessageDistributeTimeMap(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion(), stats);
+        addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
+        addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
+
+        brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());
+
+        brokerRuntimeMsgGetTotalTodayNow.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());
+
+        brokerRuntimeMsgPutTotalTodayMorning.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
+        brokerRuntimeMsgGetTotalTodayMorning.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
+        brokerRuntimeMsgPutTotalYesterdayMorning.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
+        brokerRuntimeMsgGetTotalYesterdayMorning.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
+        brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
+        brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
+        brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
+        brokerRuntimeQueryThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
+        brokerRuntimePullThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
+        brokerRuntimeSendThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
+        brokerRuntimePullThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());
+
+        brokerRuntimeRemainHowManyDataToFlush.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
+        brokerRuntimeCommitLogMinOffset.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getCommitLogMinOffset());
+        brokerRuntimeCommitLogMaxOffset.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getCommitLogMaxOffset());
+
+
+        brokerRuntimeDispatchMaxBuffer.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
+        brokerRuntimeConsumeQueueDiskRatio.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
+        brokerRuntimeCommitLogDiskRatio.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
+        brokerRuntimePageCacheLockTimeMills.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
+        brokerRuntimeGetMessageEntireTimeMax.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
+        brokerRuntimePutMessageTimesTotal.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
+        brokerRuntimeSendThreadPoolQueueSize.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
+        brokerRuntimeStartAcceptSendRequestTimeStamp.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
+        brokerRuntimePutMessageEntireTimeMax.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
+        brokerRuntimeEarliestMessageTimeStamp.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
+        brokerRuntimeRemainTransientStoreBufferNumbs.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
+        brokerRuntimeQueryThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+        brokerRuntimePutMessageAverageSize.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutMessageAverageSize());
+        brokerRuntimePutMessageSizeTotal.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
+        brokerRuntimeDispatchBehindBytes.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getDispatchBehindBytes());
     }
 
+    private void addAllKindOfTps(String brokerAddress, String clusterName, String brokerHost, BrokerRuntimeStats stats) {
+        brokerRuntimePutTps10.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutTps().getTen());
+        brokerRuntimePutTps60.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutTps().getSixty());
+        brokerRuntimePutTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
+        brokerRuntimeGetMissTps10.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetMissTps().getTen());
+        brokerRuntimeGetMissTps60.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
+        brokerRuntimeGetMissTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
+        brokerRuntimeGetTransferedTps10.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
+        brokerRuntimeGetTransferedTps60.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
+        brokerRuntimeGetTransferedTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
+        brokerRuntimeGetTotalTps10.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
+        brokerRuntimeGetTotalTps60.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
+        brokerRuntimeGetTotalTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
+        brokerRuntimeGetFoundTps10.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
+        brokerRuntimeGetFoundTps60.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
+        brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+        brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
+    }
 
-    public void AddsendBackNumsMetric(String clusterName, String brokerName, String topic, String group,  double value)
-    {
-        sendBackNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    private void addCommitLogDirCapacity(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
+        brokerRuntimeCommitLogDirCapacityTotal.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
+        brokerRuntimeCommitLogDirCapacityFree.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
     }
 
-    public void AddGroupGetLatencyMetric(String clusterName, String brokerName, String topic, String group, String queueId,double value) {
+    private void addBrokerRuntimePutMessageDistributeTimeMap(
+            String clusterName, String brokerAddress, String brokerHost,
+            String brokerDes, long bootTimestamp, int brokerVersion,
+            BrokerRuntimeStats stats) {
+        brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
+        brokerRuntimePutMessageDistributeTimeMap0to10ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
+        brokerRuntimePutMessageDistributeTimeMap10to50ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
+        brokerRuntimePutMessageDistributeTimeMap50to100ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
+        brokerRuntimePutMessageDistributeTimeMap100to200ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
+        brokerRuntimePutMessageDistributeTimeMap200to500ms.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
+        brokerRuntimePutMessageDistributeTimeMap500to1s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
+        brokerRuntimePutMessageDistributeTimeMap1to2s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
+        brokerRuntimePutMessageDistributeTimeMap2to3s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
+        brokerRuntimePutMessageDistributeTimeMap3to4s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
+        brokerRuntimePutMessageDistributeTimeMap4to5s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
+        brokerRuntimePutMessageDistributeTimeMap5to10s.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
+        brokerRuntimePutMessageDistributeTimeMap10toMore.put(new BrokerRuntimeMetric(
+                clusterName,
+                brokerAddress, brokerHost,
+                brokerDes,
+                bootTimestamp,
+                brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
+    }
 
-        groupGetLatency.put(new ConsumerQueueMetric(clusterName,brokerName,topic,group,queueId),value);
+    private static <T extends Number> void loadBrokerRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<BrokerRuntimeMetric, T> entry) {
+        family.addMetric(Arrays.asList(
+                entry.getKey().getClusterName(),
+                entry.getKey().getBrokerAddress(),
+                entry.getKey().getBrokerHost(),
+                entry.getKey().getBrokerDes(),
+                String.valueOf(entry.getKey().getBootTimestamp()),
+                String.valueOf(entry.getKey().getBrokerVersion())
+        ), entry.getValue().doubleValue());
     }
 
-    public void AddGroupGetLatencyByStoreTimeMetric(String clusterName, String brokerName, String topic, String group,double value) {
+    private static List<String> BROKER_RUNTIME_METRIC_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "brokerHost", "des", "boottime", "broker_version");
+
+    private void collectBrokerRuntimeStats(List<MetricFamilySamples> mfs) {
+        collectBrokerRuntimeStatsPutMessageDistributeTime(mfs);
+
+        GaugeMetricFamily brokerRuntimeMsgPutTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_put_total_today_now", "brokerRuntimeMsgPutTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayNowF, entry);
+        }
+        mfs.add(brokerRuntimeMsgPutTotalTodayNowF);
+
+        GaugeMetricFamily brokerRuntimeMsgGetTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_today_now", "brokerRuntimeMsgGetTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayNowF, entry);
+        }
+        mfs.add(brokerRuntimeMsgGetTotalTodayNowF);
+
+        GaugeMetricFamily brokerRuntimeDispatchBehindBytesF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_behind_bytes", "brokerRuntimeDispatchBehindBytes", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchBehindBytesF, entry);
+        }
+        mfs.add(brokerRuntimeDispatchBehindBytesF);
+
+        GaugeMetricFamily brokerRuntimePutMessageSizeTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_size_total", "brokerRuntimePutMessageSizeTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageSizeTotalF, entry);
+        }
+        mfs.add(brokerRuntimePutMessageSizeTotalF);
+
+        GaugeMetricFamily brokerRuntimePutMessageAverageSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_average_size", "brokerRuntimePutMessageAverageSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageAverageSizeF, entry);
+        }
+        mfs.add(brokerRuntimePutMessageAverageSizeF);
+
+        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpool_queue_capacity", "brokerRuntimeQueryThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueCapacityF, entry);
+        }
+        mfs.add(brokerRuntimeQueryThreadPoolQueueCapacityF);
+
+        GaugeMetricFamily brokerRuntimeRemainTransientStoreBufferNumbsF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_transientstore_buffer_numbs", "brokerRuntimeRemainTransientStoreBufferNumbs", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeRemainTransientStoreBufferNumbsF, entry);
+        }
+        mfs.add(brokerRuntimeRemainTransientStoreBufferNumbsF);
+
+        GaugeMetricFamily brokerRuntimeEarliestMessageTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_earliest_message_timestamp", "brokerRuntimeEarliestMessageTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeEarliestMessageTimeStampF, entry);
+        }
+        mfs.add(brokerRuntimeEarliestMessageTimeStampF);
+
+        GaugeMetricFamily brokerRuntimePutMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_entire_time_max", "brokerRuntimePutMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageEntireTimeMaxF, entry);
+        }
+        mfs.add(brokerRuntimePutMessageEntireTimeMaxF);
+
+        GaugeMetricFamily brokerRuntimeStartAcceptSendRequestTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_start_accept_sendrequest_time", "brokerRuntimeStartAcceptSendRequestTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeStartAcceptSendRequestTimeStampF, entry);
+        }
+        mfs.add(brokerRuntimeStartAcceptSendRequestTimeStampF);
 
-        groupGetLatencyByStoreTime.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpool_queue_size", "brokerRuntimeSendThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueSizeF, entry);
+        }
+        mfs.add(brokerRuntimeSendThreadPoolQueueSizeF);
+
+        GaugeMetricFamily brokerRuntimePutMessageTimesTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_times_total", "brokerRuntimePutMessageTimesTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageTimesTotalF, entry);
+        }
+        mfs.add(brokerRuntimePutMessageTimesTotalF);
+
+        GaugeMetricFamily brokerRuntimeGetMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_getmessage_entire_time_max", "brokerRuntimeGetMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMessageEntireTimeMaxF, entry);
+        }
+        mfs.add(brokerRuntimeGetMessageEntireTimeMaxF);
+
+        GaugeMetricFamily brokerRuntimePageCacheLockTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pagecache_lock_time_mills", "brokerRuntimePageCacheLockTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePageCacheLockTimeMillsF, entry);
+        }
+        mfs.add(brokerRuntimePageCacheLockTimeMillsF);
+
+        GaugeMetricFamily brokerRuntimeCommitLogDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_disk_ratio", "brokerRuntimeCommitLogDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDiskRatioF, entry);
+        }
+        mfs.add(brokerRuntimeCommitLogDiskRatioF);
+
+        GaugeMetricFamily brokerRuntimeConsumeQueueDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_consumequeue_disk_ratio", "brokerRuntimeConsumeQueueDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeConsumeQueueDiskRatioF, entry);
+        }
+        mfs.add(brokerRuntimeConsumeQueueDiskRatioF);
+
+        GaugeMetricFamily brokerRuntimeGetFoundTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps600", "brokerRuntimeGetFoundTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps600F, entry);
+        }
+        mfs.add(brokerRuntimeGetFoundTps600F);
+
+        GaugeMetricFamily brokerRuntimeGetFoundTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps60", "brokerRuntimeGetFoundTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps60F, entry);
+        }
+        mfs.add(brokerRuntimeGetFoundTps60F);
+
+        GaugeMetricFamily brokerRuntimeGetFoundTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps10", "brokerRuntimeGetFoundTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps10F, entry);
+        }
+        mfs.add(brokerRuntimeGetFoundTps10F);
+
+        GaugeMetricFamily brokerRuntimeGetTotalTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps600", "brokerRuntimeGetTotalTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps600F, entry);
+        }
+        mfs.add(brokerRuntimeGetTotalTps600F);
+
+        GaugeMetricFamily brokerRuntimeGetTotalTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps60", "brokerRuntimeGetTotalTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps60F, entry);
+        }
+        mfs.add(brokerRuntimeGetTotalTps60F);
+
+        GaugeMetricFamily brokerRuntimeGetTotalTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps10", "brokerRuntimeGetTotalTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps10F, entry);
+        }
+        mfs.add(brokerRuntimeGetTotalTps10F);
+
+        GaugeMetricFamily brokerRuntimeGetTransferedTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps600", "brokerRuntimeGetTransferedTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps600F, entry);
+        }
+        mfs.add(brokerRuntimeGetTransferedTps600F);
+
+        GaugeMetricFamily brokerRuntimeGetTransferedTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps60", "brokerRuntimeGetTransferedTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps60F, entry);
+        }
+        mfs.add(brokerRuntimeGetTransferedTps60F);
+
+        GaugeMetricFamily brokerRuntimeGetTransferedTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps10", "brokerRuntimeGetTransferedTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps10F, entry);
+        }
+        mfs.add(brokerRuntimeGetTransferedTps10F);
+
+        GaugeMetricFamily brokerRuntimeGetMissTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps600", "brokerRuntimeGetMissTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps600F, entry);
+        }
+        mfs.add(brokerRuntimeGetMissTps600F);
+
+        GaugeMetricFamily brokerRuntimeGetMissTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps60", "brokerRuntimeGetMissTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps60F, entry);
+        }
+        mfs.add(brokerRuntimeGetMissTps60F);
+
+        GaugeMetricFamily brokerRuntimeGetMissTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps10", "brokerRuntimeGetMissTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps10F, entry);
+        }
+        mfs.add(brokerRuntimeGetMissTps10F);
+
+        GaugeMetricFamily brokerRuntimePutTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps600", "brokerRuntimePutTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps600F, entry);
+        }
+        mfs.add(brokerRuntimePutTps600F);
+
+        GaugeMetricFamily brokerRuntimePutTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps60", "brokerRuntimePutTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps60F, entry);
+        }
+        mfs.add(brokerRuntimePutTps60F);
+
+        GaugeMetricFamily brokerRuntimePutTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps10", "brokerRuntimePutTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps10F, entry);
+        }
+        mfs.add(brokerRuntimePutTps10F);
+
+        GaugeMetricFamily brokerRuntimeDispatchMaxBufferF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_maxbuffer", "brokerRuntimeDispatchMaxBuffer", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchMaxBufferF, entry);
+        }
+        mfs.add(brokerRuntimeDispatchMaxBufferF);
+
+        GaugeMetricFamily brokerRuntimePullThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_capacity", "brokerRuntimePullThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueCapacityF, entry);
+        }
+        mfs.add(brokerRuntimePullThreadPoolQueueCapacityF);
+
+        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_capacity", "brokerRuntimeSendThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueCapacityF, entry);
+        }
+        mfs.add(brokerRuntimeSendThreadPoolQueueCapacityF);
+
+        GaugeMetricFamily brokerRuntimePullThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_size", "brokerRuntimePullThreadPoolQueueSizeF", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueSizeF, entry);
+        }
+        mfs.add(brokerRuntimePullThreadPoolQueueSizeF);
+
+        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_size", "brokerRuntimeQueryThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueSizeF, entry);
+        }
+        mfs.add(brokerRuntimeQueryThreadPoolQueueSizeF);
+
+        GaugeMetricFamily brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills", "brokerRuntimePullThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF, entry);
+        }
+        mfs.add(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF);
+
+        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills", "brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF, entry);
+        }
+        mfs.add(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF);
+
+        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills", "brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF, entry);
+        }
+        mfs.add(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF);
+
+        GaugeMetricFamily brokerRuntimeMsgGetTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_yesterdaymorning", "brokerRuntimeMsgGetTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalYesterdayMorningF, entry);
+        }
+        mfs.add(brokerRuntimeMsgGetTotalYesterdayMorningF);
+
+        GaugeMetricFamily brokerRuntimeMsgPutTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_yesterdaymorning", "brokerRuntimeMsgPutTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalYesterdayMorningF, entry);
+        }
+        mfs.add(brokerRuntimeMsgPutTotalYesterdayMorningF);
+
+        GaugeMetricFamily brokerRuntimeMsgGetTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_todaymorning", "brokerRuntimeMsgGetTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayMorningF, entry);
+        }
+        mfs.add(brokerRuntimeMsgGetTotalTodayMorningF);
+
+        GaugeMetricFamily brokerRuntimeMsgPutTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_todaymorning", "brokerRuntimeMsgPutTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayMorningF, entry);
+        }
+        mfs.add(brokerRuntimeMsgPutTotalTodayMorningF);
+
+        GaugeMetricFamily brokerRuntimeCommitLogDirCapacityFreeF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_free", "brokerRuntimeCommitLogDirCapacityFree", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityFreeF, entry);
+        }
+        mfs.add(brokerRuntimeCommitLogDirCapacityFreeF);
+
+        GaugeMetricFamily brokerRuntimeCommitLogDirCapacityTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_total", "brokerRuntimeCommitLogDirCapacityTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityTotalF, entry);
+        }
+        mfs.add(brokerRuntimeCommitLogDirCapacityTotalF);
+
+        GaugeMetricFamily brokerRuntimeCommitLogMaxOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_maxoffset", "brokerRuntimeCommitLogMaxOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMaxOffsetF, entry);
+        }
+        mfs.add(brokerRuntimeCommitLogMaxOffsetF);
+
+        GaugeMetricFamily brokerRuntimeCommitLogMinOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_minoffset", "brokerRuntimeCommitLogMinOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMinOffsetF, entry);
+        }
+        mfs.add(brokerRuntimeCommitLogMinOffsetF);
+
+        GaugeMetricFamily brokerRuntimeRemainHowManyDataToFlushF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_howmanydata_toflush", "brokerRuntimeRemainHowManyDataToFlush", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
+        }
+        mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
     }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
index 2b3956b..e090b68 100644
--- a/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
@@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
-
-
 import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
 
 @Configuration
@@ -69,9 +67,11 @@ public class RMQConfigure {
             logger.info("setIsVIPChannel isVIPChannel={}", isVIPChannel);
         }
     }
+
     public boolean isEnableCollect() {
         return enableCollect;
     }
+
     public void setEnableCollect(boolean enableCollect) {
         this.enableCollect = enableCollect;
     }
diff --git a/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java b/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java
new file mode 100644
index 0000000..9422272
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/config/ScheduleConfig.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.exporter.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+@Configuration
+public class ScheduleConfig implements SchedulingConfigurer {
+    @Override
+    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+        taskRegistrar.setScheduler(taskExecutor());
+    }
+
+    @Value("${task.count}")
+    private int taskCount;
+
+    @Bean(destroyMethod = "shutdown")
+    public Executor taskExecutor() {
+        return Executors.newScheduledThreadPool(this.taskCount);
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java b/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
index 4df69d1..e6fe31c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
+++ b/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.exporter.controller;
 
 
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
@@ -33,19 +31,14 @@ import java.io.StringWriter;
 @RestController
 @EnableAutoConfiguration
 public class RMQMetricsController {
-
-    private final static Logger log = LoggerFactory.getLogger(RMQMetricsController.class);
-
     @Resource
     RMQMetricsService metricsService;
 
     @RequestMapping(value = "${rocketmq.config.webTelemetryPath}")
     @ResponseBody
     private void metrics(HttpServletResponse response) throws IOException {
-
         StringWriter writer = new StringWriter();
-        metricsService.Metrics(writer);
-
+        metricsService.metrics(writer);
         response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
         response.getOutputStream().print(writer.toString());
     }
diff --git a/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java b/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
deleted file mode 100644
index ccf3fdc..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.exporter.exception;
-
-public class ServiceException extends RuntimeException {
-    private static final long serialVersionUID = 9213584003139969215L;
-    private int code;
-
-    public ServiceException(int code, String message) {
-        super(message);
-        this.code = code;
-    }
-
-    public int getCode() {
-        return code;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
new file mode 100644
index 0000000..b6fe3dc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
@@ -0,0 +1,609 @@
+package org.apache.rocketmq.exporter.model;
+
+import org.apache.rocketmq.common.protocol.body.KVTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BrokerRuntimeStats {
+    //今天生产的消息总量
+    private long msgPutTotalTodayNow;
+    //今天消费的消息总量
+    private long msgGetTotalTodayNow;
+
+    //今天早上生产消息总量
+    private long msgPutTotalTodayMorning;
+    //今天早上消费消息总量
+    private long msgGetTotalTodayMorning;
+
+    //昨天早上生产的消息总量
+    private long msgPutTotalYesterdayMorning;
+    //昨天早上消费的消息总量
+    private long msgGetTotalYesterdayMorning;
+
+    //延迟消息位点
+    private List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables = new ArrayList<>();
+
+    //发送线程最大等待时间
+    private long sendThreadPoolQueueHeadWaitTimeMills;
+    //拉取消息线程最大等待时间
+    private long queryThreadPoolQueueHeadWaitTimeMills;
+    //拉取线程最大等待时间
+    private long pullThreadPoolQueueHeadWaitTimeMills;
+
+    //查询线程任务个数
+    private long queryThreadPoolQueueSize;
+    //拉取线程任务个数
+    private long pullThreadPoolQueueSize;
+    //发送线程等待队列长度
+    private long sendThreadPoolQueueCapacity;
+    //拉取线程等待队列长度
+    private long pullThreadPoolQueueCapacity;
+
+    //刷pagecache时间统计
+    private Map<String, Integer> putMessageDistributeTimeMap = new HashMap<>();
+    //还有多少字节的数据没有刷盘
+    private double remainHowManyDataToFlush;
+
+    //commitlog 最小位点
+    private long commitLogMinOffset;
+    //commitlog 最大位点
+    private long commitLogMaxOffset;
+
+    //broker运行时间描述
+    private String runtime;
+    //broker 启动时间
+    private long bootTimestamp;
+    //broker 磁盘总量
+    private double commitLogDirCapacityTotal;
+    //broker 磁盘剩余
+    private double commitLogDirCapacityFree;
+    //broker 版本号
+    private int brokerVersion;
+    //
+    private long dispatchMaxBuffer;
+
+    private PutTps putTps = new PutTps();
+    private GetMissTps getMissTps = new GetMissTps();
+    private GetTransferedTps getTransferedTps = new GetTransferedTps();
+    private GetTotalTps getTotalTps = new GetTotalTps();
+    private GetFoundTps getFoundTps = new GetFoundTps();
+
+    private double consumeQueueDiskRatio;
+    private double commitLogDiskRatio;
+
+    //page cache锁定时间
+    private long pageCacheLockTimeMills;
+
+    private long getMessageEntireTimeMax;
+
+    private long putMessageTimesTotal;
+
+    private String brokerVersionDesc;
+    private long sendThreadPoolQueueSize;
+    private long startAcceptSendRequestTimeStamp;
+    private long putMessageEntireTimeMax;
+    private long earliestMessageTimeStamp;
+
+    private long remainTransientStoreBufferNumbs;
+    private long queryThreadPoolQueueCapacity;
+    //发送消息平均体积大小
+    private double putMessageAverageSize;
+    //全部发送消息数
+    private long putMessageSizeTotal;
+    private long dispatchBehindBytes;
+
+
+    public BrokerRuntimeStats(KVTable kvTable) {
+        this.msgPutTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayNow"));
+
+        loadScheduleMessageOffsets(kvTable);
+        loadPutMessageDistributeTime(kvTable.getTable().get("putMessageDistributeTime"));
+
+        loadTps(this.putTps, kvTable.getTable().get("putTps"));
+        loadTps(this.getMissTps, kvTable.getTable().get("getMissTps"));
+        loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferedTps"));
+        loadTps(this.getTotalTps, kvTable.getTable().get("getTotalTps"));
+        loadTps(this.getFoundTps, kvTable.getTable().get("getFoundTps"));
+
+        loadCommitLogDirCapacity(kvTable.getTable().get("commitLogDirCapacity"));
+
+        this.sendThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"));
+        this.queryThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueHeadWaitTimeMills"));
+
+        this.remainHowManyDataToFlush = Double.parseDouble(kvTable.getTable().get("remainHowManyDataToFlush").split(" ")[0]);//byte
+        this.msgGetTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayNow"));
+        this.queryThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueSize"));
+        this.bootTimestamp = Long.parseLong(kvTable.getTable().get("bootTimestamp"));
+        this.msgPutTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
+        this.msgGetTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
+        this.pullThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueSize"));
+        this.commitLogMinOffset = Long.parseLong(kvTable.getTable().get("commitLogMinOffset"));
+        this.pullThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"));
+        this.runtime = kvTable.getTable().get("runtime");
+        this.dispatchMaxBuffer = Long.parseLong(kvTable.getTable().get("dispatchMaxBuffer"));
+        this.brokerVersion = Integer.parseInt(kvTable.getTable().get("brokerVersion"));
+        this.consumeQueueDiskRatio = Double.parseDouble(kvTable.getTable().get("consumeQueueDiskRatio"));
+        this.pageCacheLockTimeMills = Long.parseLong(kvTable.getTable().get("pageCacheLockTimeMills"));
+        this.commitLogDiskRatio = Double.parseDouble(kvTable.getTable().get("commitLogDiskRatio"));
+        this.commitLogMaxOffset = Long.parseLong(kvTable.getTable().get("commitLogMaxOffset"));
+        this.getMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("getMessageEntireTimeMax"));
+        this.msgPutTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning"));
+        this.putMessageTimesTotal = Long.parseLong(kvTable.getTable().get("putMessageTimesTotal"));
+        this.msgGetTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning"));
+        this.brokerVersionDesc = kvTable.getTable().get("brokerVersionDesc");
+        this.sendThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueSize"));
+        this.startAcceptSendRequestTimeStamp = Long.parseLong(kvTable.getTable().get("startAcceptSendRequestTimeStamp"));
+        this.putMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("putMessageEntireTimeMax"));
+        this.earliestMessageTimeStamp = Long.parseLong(kvTable.getTable().get("earliestMessageTimeStamp"));
+        this.remainTransientStoreBufferNumbs = Long.parseLong(kvTable.getTable().get("remainTransientStoreBufferNumbs"));
+        this.queryThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueCapacity"));
+        this.putMessageAverageSize = Double.parseDouble(kvTable.getTable().get("putMessageAverageSize"));
+        this.dispatchBehindBytes = Long.parseLong(kvTable.getTable().get("dispatchBehindBytes"));
+        this.putMessageSizeTotal = Long.parseLong(kvTable.getTable().get("putMessageSizeTotal"));
+        this.sendThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueCapacity"));
+        this.pullThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueCapacity"));
+
+    }
+
+    private void loadCommitLogDirCapacity(String commitLogDirCapacity) {
+        String[] arr = commitLogDirCapacity.split(" ");
+        double total = Double.parseDouble(arr[2]);
+        double free = Double.parseDouble(arr[6]);
+        this.commitLogDirCapacityTotal = total;
+        this.commitLogDirCapacityFree = free;
+    }
+
+    private void loadTps(PutTps putTps, String value) {
+        String[] arr = value.split(" ");
+        putTps.ten = Double.parseDouble(arr[0]);
+        putTps.sixty = Double.parseDouble(arr[1]);
+        putTps.sixHundred = Double.parseDouble(arr[2]);
+    }
+
+    private void loadPutMessageDistributeTime(String str) {
+        String[] arr = str.split(" ");
+        String key = "", value = "";
+        for (String ar : arr) {
+            String[] tarr = ar.split(":");
+            key = tarr[0].replace("[", "").replace("]", "");
+            value = tarr[1];
+            this.putMessageDistributeTimeMap.put(key, Integer.parseInt(value));
+        }
+    }
+
+    public void loadScheduleMessageOffsets(KVTable kvTable) {
+        for (String key : kvTable.getTable().keySet()) {
+            if (key.startsWith("scheduleMessageOffset")) {
+                String[] arr = kvTable.getTable().get(key).split(",");
+                ScheduleMessageOffsetTable table = new ScheduleMessageOffsetTable(
+                        Long.parseLong(arr[0]),
+                        Long.parseLong(arr[1])
+                );
+                this.scheduleMessageOffsetTables.add(table);
+            }
+        }
+    }
+
+    public static class ScheduleMessageOffsetTable {
+        private long delayOffset;
+        private long maxOffset;
+
+        public ScheduleMessageOffsetTable(long first, long second) {
+            this.delayOffset = first;
+            this.maxOffset = second;
+        }
+
+        public long getDelayOffset() {
+            return delayOffset;
+        }
+
+        public void setDelayOffset(long delayOffset) {
+            this.delayOffset = delayOffset;
+        }
+
+        public long getMaxOffset() {
+            return maxOffset;
+        }
+
+        public void setMaxOffset(long maxOffset) {
+            this.maxOffset = maxOffset;
+        }
+    }
+
+    public class PutTps {
+        private double ten;
+        private double sixty;
+        private double sixHundred;
+
+        public double getTen() {
+            return ten;
+        }
+
+        public void setTen(double ten) {
+            this.ten = ten;
+        }
+
+        public double getSixty() {
+            return sixty;
+        }
+
+        public void setSixty(double sixty) {
+            this.sixty = sixty;
+        }
+
+        public double getSixHundred() {
+            return sixHundred;
+        }
+
+        public void setSixHundred(double sixHundred) {
+            this.sixHundred = sixHundred;
+        }
+    }
+
+    public class GetMissTps extends PutTps {
+    }
+
+    public class GetTransferedTps extends PutTps {
+    }
+
+    public class GetTotalTps extends PutTps {
+    }
+
+    public class GetFoundTps extends PutTps {
+    }
+
+    public long getMsgPutTotalTodayNow() {
+        return msgPutTotalTodayNow;
+    }
+
+    public void setMsgPutTotalTodayNow(long msgPutTotalTodayNow) {
+        this.msgPutTotalTodayNow = msgPutTotalTodayNow;
+    }
+
+    public long getMsgGetTotalTodayNow() {
+        return msgGetTotalTodayNow;
+    }
+
+    public void setMsgGetTotalTodayNow(long msgGetTotalTodayNow) {
+        this.msgGetTotalTodayNow = msgGetTotalTodayNow;
+    }
+
+    public long getMsgPutTotalTodayMorning() {
+        return msgPutTotalTodayMorning;
+    }
+
+    public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+        this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+    }
+
+    public long getMsgGetTotalTodayMorning() {
+        return msgGetTotalTodayMorning;
+    }
+
+    public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+        this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+    }
+
+    public long getMsgPutTotalYesterdayMorning() {
+        return msgPutTotalYesterdayMorning;
+    }
+
+    public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+        this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+    }
+
+    public long getMsgGetTotalYesterdayMorning() {
+        return msgGetTotalYesterdayMorning;
+    }
+
+    public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+        this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+    }
+
+    public List<ScheduleMessageOffsetTable> getScheduleMessageOffsetTables() {
+        return scheduleMessageOffsetTables;
+    }
+
+    public void setScheduleMessageOffsetTables(List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables) {
+        this.scheduleMessageOffsetTables = scheduleMessageOffsetTables;
+    }
+
+    public long getSendThreadPoolQueueHeadWaitTimeMills() {
+        return sendThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public void setSendThreadPoolQueueHeadWaitTimeMills(long sendThreadPoolQueueHeadWaitTimeMills) {
+        this.sendThreadPoolQueueHeadWaitTimeMills = sendThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public long getQueryThreadPoolQueueHeadWaitTimeMills() {
+        return queryThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public void setQueryThreadPoolQueueHeadWaitTimeMills(long queryThreadPoolQueueHeadWaitTimeMills) {
+        this.queryThreadPoolQueueHeadWaitTimeMills = queryThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public long getPullThreadPoolQueueHeadWaitTimeMills() {
+        return pullThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public void setPullThreadPoolQueueHeadWaitTimeMills(long pullThreadPoolQueueHeadWaitTimeMills) {
+        this.pullThreadPoolQueueHeadWaitTimeMills = pullThreadPoolQueueHeadWaitTimeMills;
+    }
+
+    public long getQueryThreadPoolQueueSize() {
+        return queryThreadPoolQueueSize;
+    }
+
+    public void setQueryThreadPoolQueueSize(long queryThreadPoolQueueSize) {
+        this.queryThreadPoolQueueSize = queryThreadPoolQueueSize;
+    }
+
+    public long getPullThreadPoolQueueSize() {
+        return pullThreadPoolQueueSize;
+    }
+
+    public void setPullThreadPoolQueueSize(long pullThreadPoolQueueSize) {
+        this.pullThreadPoolQueueSize = pullThreadPoolQueueSize;
+    }
+
+    public long getSendThreadPoolQueueCapacity() {
+        return sendThreadPoolQueueCapacity;
+    }
+
+    public void setSendThreadPoolQueueCapacity(long sendThreadPoolQueueCapacity) {
+        this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
+    }
+
+    public long getPullThreadPoolQueueCapacity() {
+        return pullThreadPoolQueueCapacity;
+    }
+
+    public void setPullThreadPoolQueueCapacity(long pullThreadPoolQueueCapacity) {
+        this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
+    }
+
+    public Map<String, Integer> getPutMessageDistributeTimeMap() {
+        return putMessageDistributeTimeMap;
+    }
+
+    public void setPutMessageDistributeTimeMap(Map<String, Integer> putMessageDistributeTimeMap) {
+        this.putMessageDistributeTimeMap = putMessageDistributeTimeMap;
+    }
+
+    public double getRemainHowManyDataToFlush() {
+        return remainHowManyDataToFlush;
+    }
+
+    public void setRemainHowManyDataToFlush(double remainHowManyDataToFlush) {
+        this.remainHowManyDataToFlush = remainHowManyDataToFlush;
+    }
+
+    public long getCommitLogMinOffset() {
+        return commitLogMinOffset;
+    }
+
+    public void setCommitLogMinOffset(long commitLogMinOffset) {
+        this.commitLogMinOffset = commitLogMinOffset;
+    }
+
+    public long getCommitLogMaxOffset() {
+        return commitLogMaxOffset;
+    }
+
+    public void setCommitLogMaxOffset(long commitLogMaxOffset) {
+        this.commitLogMaxOffset = commitLogMaxOffset;
+    }
+
+    public String getRuntime() {
+        return runtime;
+    }
+
+    public void setRuntime(String runtime) {
+        this.runtime = runtime;
+    }
+
+    public long getBootTimestamp() {
+        return bootTimestamp;
+    }
+
+    public void setBootTimestamp(long bootTimestamp) {
+        this.bootTimestamp = bootTimestamp;
+    }
+
+    public double getCommitLogDirCapacityTotal() {
+        return commitLogDirCapacityTotal;
+    }
+
+    public void setCommitLogDirCapacityTotal(double commitLogDirCapacityTotal) {
+        this.commitLogDirCapacityTotal = commitLogDirCapacityTotal;
+    }
+
+    public double getCommitLogDirCapacityFree() {
+        return commitLogDirCapacityFree;
+    }
+
+    public void setCommitLogDirCapacityFree(double commitLogDirCapacityFree) {
+        this.commitLogDirCapacityFree = commitLogDirCapacityFree;
+    }
+
+    public int getBrokerVersion() {
+        return brokerVersion;
+    }
+
+    public void setBrokerVersion(int brokerVersion) {
+        this.brokerVersion = brokerVersion;
+    }
+
+    public long getDispatchMaxBuffer() {
+        return dispatchMaxBuffer;
+    }
+
+    public void setDispatchMaxBuffer(long dispatchMaxBuffer) {
+        this.dispatchMaxBuffer = dispatchMaxBuffer;
+    }
+
+    public PutTps getPutTps() {
+        return putTps;
+    }
+
+    public void setPutTps(PutTps putTps) {
+        this.putTps = putTps;
+    }
+
+    public GetMissTps getGetMissTps() {
+        return getMissTps;
+    }
+
+    public void setGetMissTps(GetMissTps getMissTps) {
+        this.getMissTps = getMissTps;
+    }
+
+    public GetTransferedTps getGetTransferedTps() {
+        return getTransferedTps;
+    }
+
+    public void setGetTransferedTps(GetTransferedTps getTransferedTps) {
+        this.getTransferedTps = getTransferedTps;
+    }
+
+    public GetTotalTps getGetTotalTps() {
+        return getTotalTps;
+    }
+
+    public void setGetTotalTps(GetTotalTps getTotalTps) {
+        this.getTotalTps = getTotalTps;
+    }
+
+    public GetFoundTps getGetFoundTps() {
+        return getFoundTps;
+    }
+
+    public void setGetFoundTps(GetFoundTps getFoundTps) {
+        this.getFoundTps = getFoundTps;
+    }
+
+    public double getConsumeQueueDiskRatio() {
+        return consumeQueueDiskRatio;
+    }
+
+    public void setConsumeQueueDiskRatio(double consumeQueueDiskRatio) {
+        this.consumeQueueDiskRatio = consumeQueueDiskRatio;
+    }
+
+    public double getCommitLogDiskRatio() {
+        return commitLogDiskRatio;
+    }
+
+    public void setCommitLogDiskRatio(double commitLogDiskRatio) {
+        this.commitLogDiskRatio = commitLogDiskRatio;
+    }
+
+    public long getPageCacheLockTimeMills() {
+        return pageCacheLockTimeMills;
+    }
+
+    public void setPageCacheLockTimeMills(long pageCacheLockTimeMills) {
+        this.pageCacheLockTimeMills = pageCacheLockTimeMills;
+    }
+
+    public long getGetMessageEntireTimeMax() {
+        return getMessageEntireTimeMax;
+    }
+
+    public void setGetMessageEntireTimeMax(long getMessageEntireTimeMax) {
+        this.getMessageEntireTimeMax = getMessageEntireTimeMax;
+    }
+
+    public long getPutMessageTimesTotal() {
+        return putMessageTimesTotal;
+    }
+
+    public void setPutMessageTimesTotal(long putMessageTimesTotal) {
+        this.putMessageTimesTotal = putMessageTimesTotal;
+    }
+
+    public String getBrokerVersionDesc() {
+        return brokerVersionDesc;
+    }
+
+    public void setBrokerVersionDesc(String brokerVersionDesc) {
+        this.brokerVersionDesc = brokerVersionDesc;
+    }
+
+    public long getSendThreadPoolQueueSize() {
+        return sendThreadPoolQueueSize;
+    }
+
+    public void setSendThreadPoolQueueSize(long sendThreadPoolQueueSize) {
+        this.sendThreadPoolQueueSize = sendThreadPoolQueueSize;
+    }
+
+    public long getStartAcceptSendRequestTimeStamp() {
+        return startAcceptSendRequestTimeStamp;
+    }
+
+    public void setStartAcceptSendRequestTimeStamp(long startAcceptSendRequestTimeStamp) {
+        this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
+    }
+
+    public long getPutMessageEntireTimeMax() {
+        return putMessageEntireTimeMax;
+    }
+
+    public void setPutMessageEntireTimeMax(long putMessageEntireTimeMax) {
+        this.putMessageEntireTimeMax = putMessageEntireTimeMax;
+    }
+
+    public long getEarliestMessageTimeStamp() {
+        return earliestMessageTimeStamp;
+    }
+
+    public void setEarliestMessageTimeStamp(long earliestMessageTimeStamp) {
+        this.earliestMessageTimeStamp = earliestMessageTimeStamp;
+    }
+
+    public long getRemainTransientStoreBufferNumbs() {
+        return remainTransientStoreBufferNumbs;
+    }
+
+    public void setRemainTransientStoreBufferNumbs(long remainTransientStoreBufferNumbs) {
+        this.remainTransientStoreBufferNumbs = remainTransientStoreBufferNumbs;
+    }
+
+    public long getQueryThreadPoolQueueCapacity() {
+        return queryThreadPoolQueueCapacity;
+    }
+
+    public void setQueryThreadPoolQueueCapacity(long queryThreadPoolQueueCapacity) {
+        this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
+    }
+
+    public double getPutMessageAverageSize() {
+        return putMessageAverageSize;
+    }
+
+    public void setPutMessageAverageSize(double putMessageAverageSize) {
+        this.putMessageAverageSize = putMessageAverageSize;
+    }
+
+    public long getPutMessageSizeTotal() {
+        return putMessageSizeTotal;
+    }
+
+    public void setPutMessageSizeTotal(long putMessageSizeTotal) {
+        this.putMessageSizeTotal = putMessageSizeTotal;
+    }
+
+    public long getDispatchBehindBytes() {
+        return dispatchBehindBytes;
+    }
+
+    public void setDispatchBehindBytes(long dispatchBehindBytes) {
+        this.dispatchBehindBytes = dispatchBehindBytes;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
index c7a0727..45caff2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
@@ -17,32 +17,38 @@
 package org.apache.rocketmq.exporter.model.metrics;
 
 public class BrokerMetric {
+    private String clusterName;
+    private String brokerIP;
+    private String brokerHost;
 
-    private  String   clusterName;
-    private  String   brokerName;
-
-
-    public void setClusterName(String cluster) {
-
-        clusterName = cluster;
+    public BrokerMetric(String clusterName, String brokerIP, String brokerHost) {
+        this.clusterName = clusterName;
+        this.brokerIP = brokerIP;
+        this.brokerHost = brokerHost;
     }
-    public  String getClusterName() {
 
+    public String getClusterName() {
         return clusterName;
     }
-    void setBrokerName(String broker) {
 
-        brokerName = broker;
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
     }
 
-    public String getBrokerName() {
+    public String getBrokerIP() {
+        return brokerIP;
+    }
+
+    public void setBrokerIP(String brokerIP) {
+        this.brokerIP = brokerIP;
+    }
 
-        return brokerName;
+    public String getBrokerHost() {
+        return brokerHost;
     }
 
-    public BrokerMetric(String cluster, String broker) {
-        clusterName = cluster;
-        brokerName  =   broker;
+    public void setBrokerHost(String brokerHost) {
+        this.brokerHost = brokerHost;
     }
 
     @Override
@@ -52,19 +58,20 @@ public class BrokerMetric {
         }
         BrokerMetric other = (BrokerMetric) obj;
 
-        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName);
+        return other.clusterName.equals(clusterName) &&
+                other.brokerIP.equals(brokerIP);
     }
 
     @Override
     public int hashCode() {
         int hash = 1;
         hash = 37 * hash + clusterName.hashCode();
-        hash = 37 * hash + brokerName.hashCode();
+        hash = 37 * hash + brokerIP.hashCode();
         return hash;
     }
 
     @Override
     public String toString() {
-        return "ClusterName: " + clusterName + " BrokerName: " + brokerName;
+        return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " brokerHost: " + brokerHost;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
new file mode 100644
index 0000000..67d9f20
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerCountMetric.java
@@ -0,0 +1,58 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerCountMetric {
+    private String caddr;
+    private String localaddr;
+    private String group;
+
+    public ConsumerCountMetric(String group, String caddr, String localaddr) {
+        this.group = group;
+        this.caddr = caddr;
+        this.localaddr = localaddr;
+    }
+
+    public String getCaddr() {
+        return caddr;
+    }
+
+    public void setCaddr(String caddr) {
+        this.caddr = caddr;
+    }
+
+    public String getLocaladdr() {
+        return localaddr;
+    }
+
+    public void setLocaladdr(String localaddr) {
+        this.localaddr = localaddr;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ConsumerCountMetric)) {
+            return false;
+        }
+        ConsumerCountMetric other = (ConsumerCountMetric) obj;
+        return other.group.equals(group);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + group.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + group + " caddr: " + caddr + " localaddr: " + localaddr;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
index 9530fff..a0036a2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
@@ -17,32 +17,22 @@
 package org.apache.rocketmq.exporter.model.metrics;
 
 public class ConsumerMetric {
+    private String topicName;
+    private String consumerGroupName;
 
-    private  String   clusterName;
-    private  String   brokerName;
-    private  String   topicName;
-    private  String   consumerGroupName;
-
-    public void setClusterName(String cluster) {
-        clusterName = cluster;
-    }
-    public  String getClusterName() {
-        return clusterName;
-    }
-    void setBrokerName(String broker) {
-        brokerName = broker;
-    }
-
-    public String getBrokerName() {
-        return brokerName;
+    public ConsumerMetric(String topicName, String consumerGroupName) {
+        this.topicName = topicName;
+        this.consumerGroupName = consumerGroupName;
     }
 
-    public void setTopicName(String topic) {
-        topicName = topic;
-    }
     public String getTopicName() {
         return topicName;
     }
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
     public String getConsumerGroupName() {
         return consumerGroupName;
     }
@@ -51,13 +41,6 @@ public class ConsumerMetric {
         this.consumerGroupName = consumerGroupName;
     }
 
-    public ConsumerMetric(String cluster, String broker, String topic,String consumerGroup) {
-        clusterName = cluster;
-        brokerName  =   broker;
-        topicName   =   topic;
-        consumerGroupName   =   consumerGroup;
-    }
-
     @Override
     public boolean equals(Object obj) {
         if (!(obj instanceof ConsumerMetric)) {
@@ -65,15 +48,13 @@ public class ConsumerMetric {
         }
         ConsumerMetric other = (ConsumerMetric) obj;
 
-        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
-                && other.topicName.equals(topicName)  && other.consumerGroupName.equals(consumerGroupName);
+        return other.topicName.equals(topicName) &&
+                other.consumerGroupName.equals(consumerGroupName);
     }
 
     @Override
     public int hashCode() {
         int hash = 1;
-        hash = 37 * hash + clusterName.hashCode();
-        hash = 37 * hash + brokerName.hashCode();
         hash = 37 * hash + topicName.hashCode();
         hash = 37 * hash + consumerGroupName.hashCode();
         return hash;
@@ -81,6 +62,6 @@ public class ConsumerMetric {
 
     @Override
     public String toString() {
-        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
+        return "topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
deleted file mode 100644
index a6453fc..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.exporter.model.metrics;
-
-public class ConsumerQueueMetric {
-
-    private  String   clusterName;
-    private  String   brokerName;
-    private  String   topicName;
-    private  String   consumerGroupName;
-    private  String   queueId;
-
-    public void setClusterName(String cluster) {
-        clusterName = cluster;
-    }
-    public  String getClusterName() {
-        return clusterName;
-    }
-    void setBrokerName(String broker) {
-        brokerName = broker;
-    }
-    public String getBrokerName() {
-        return brokerName;
-    }
-    public void setTopicName(String topic) {
-        topicName = topic;
-    }
-    public String  getTopicName() {
-        return topicName;
-    }
-    public String getConsumerGroupName() {
-        return consumerGroupName;
-    }
-
-    public void setConsumerGroupName(String consumerGroupName) {
-        this.consumerGroupName = consumerGroupName;
-    }
-    public String getQueueId() {
-        return queueId;
-    }
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
-    }
-    public ConsumerQueueMetric(String cluster, String broker, String topic, String consumerGroup,String queue) {
-        clusterName = cluster;
-        brokerName  =   broker;
-        topicName   =   topic;
-        consumerGroupName   =   consumerGroup;
-        queueId             =   queue;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof ConsumerQueueMetric)) {
-            return false;
-        }
-        ConsumerQueueMetric other = (ConsumerQueueMetric) obj;
-
-        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
-                && other.topicName.equals(topicName)  && other.consumerGroupName.equals(consumerGroupName)
-                && other.queueId.equals(queueId);
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 1;
-        hash = 37 * hash + clusterName.hashCode();
-        hash = 37 * hash + brokerName.hashCode();
-        hash = 37 * hash + topicName.hashCode();
-        hash = 37 * hash + consumerGroupName.hashCode();
-        hash = 37 * hash + queueId.hashCode();
-        return hash;
-    }
-
-    @Override
-    public String toString() {
-        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName  +  "queueId: " + queueId;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
new file mode 100644
index 0000000..84b15c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerTopicDiffMetric.java
@@ -0,0 +1,61 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerTopicDiffMetric {
+    public ConsumerTopicDiffMetric(String group, String topic, String countOfOnlineConsumers) {
+        this.group = group;
+        this.topic = topic;
+        this.countOfOnlineConsumers = countOfOnlineConsumers;
+    }
+
+    private String group;
+    private String topic;
+    private String countOfOnlineConsumers;
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getCountOfOnlineConsumers() {
+        return countOfOnlineConsumers;
+    }
+
+    public void setCountOfOnlineConsumers(String countOfOnlineConsumers) {
+        this.countOfOnlineConsumers = countOfOnlineConsumers;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ConsumerTopicDiffMetric)) {
+            return false;
+        }
+        ConsumerTopicDiffMetric other = (ConsumerTopicDiffMetric) obj;
+
+        return other.group.equals(group) &&
+                other.topic.equals(topic);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + group.hashCode();
+        hash = 37 * hash + topic.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumerGroup: " + group + " Topic: " + topic;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
new file mode 100644
index 0000000..da530de
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
@@ -0,0 +1,72 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class DLQTopicOffsetMetric {
+    private String clusterName;
+    private String brokerNames;
+    private String group;
+    private long lastUpdateTimestamp;
+
+    public DLQTopicOffsetMetric(String clusterName, String brokerNames, String group, long lastUpdateTimestamp) {
+        this.clusterName = clusterName;
+        this.brokerNames = brokerNames;
+        this.group = group;
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerNames() {
+        return brokerNames;
+    }
+
+    public void setBrokerNames(String brokerNames) {
+        this.brokerNames = brokerNames;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof DLQTopicOffsetMetric)) {
+            return false;
+        }
+        DLQTopicOffsetMetric other = (DLQTopicOffsetMetric) obj;
+
+        return other.clusterName.equals(clusterName) &&
+                other.group.equals(group);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + group.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " group: " + group;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
index 72baa73..fb9c5a7 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
@@ -16,34 +16,50 @@
  */
 package org.apache.rocketmq.exporter.model.metrics;
 
+//每个topic最大位点
 public class ProducerMetric {
+    private String clusterName;
+    private String brokerNames;
+    private String topicName;
+    private long lastUpdateTimestamp;
 
-    private  String   clusterName;
-    private  String   brokerName;
-    private  String   topicName;
-
-    public void setClusterName(String cluster) {
-        clusterName = cluster;
-    }
-    public  String getClusterName() {
+    public String getClusterName() {
         return clusterName;
     }
-    void setBrokerName(String broker) {
-        brokerName = broker;
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
     }
-    public String getBrokerName() {
-        return brokerName;
+
+    public String getBrokerNames() {
+        return brokerNames;
     }
-    public void setTopicName(String topic) {
-        topicName = topic;
+
+    public void setBrokerNames(String brokerNames) {
+        this.brokerNames = brokerNames;
     }
-    public String  getTopicName() {
+
+    public String getTopicName() {
         return topicName;
     }
-    public ProducerMetric(String cluster,String broker,String topic) {
-        clusterName = cluster;
-        brokerName  =   broker;
-        topicName   =   topic;
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public ProducerMetric(String clusterName, String brokerNames, String topicName, long lastUpdateTimestamp) {
+        this.clusterName = clusterName;
+        this.brokerNames = brokerNames;
+        this.topicName = topicName;
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
 
     @Override
@@ -53,21 +69,20 @@ public class ProducerMetric {
         }
         ProducerMetric other = (ProducerMetric) obj;
 
-        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
-                && other.topicName.equals(topicName);
+        return other.clusterName.equals(clusterName) &&
+                other.topicName.equals(topicName);
     }
 
     @Override
     public int hashCode() {
         int hash = 1;
         hash = 37 * hash + clusterName.hashCode();
-        hash = 37 * hash + brokerName.hashCode();
         hash = 37 * hash + topicName.hashCode();
         return hash;
     }
 
     @Override
     public String toString() {
-        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName;
+        return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " topicName: " + topicName;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
new file mode 100644
index 0000000..37f4e86
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
@@ -0,0 +1,83 @@
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class TopicPutNumMetric {
+    private String clusterName;
+    private String brokerNames;
+    private String brokerIP;
+    private String brokerHost;
+    private String topicName;
+
+    public TopicPutNumMetric(String clusterName, String brokerNames, String brokerIP, String brokerHost, String topicName) {
+        this.clusterName = clusterName;
+        this.brokerNames = brokerNames;
+        this.brokerIP = brokerIP;
+        this.brokerHost = brokerHost;
+        this.topicName = topicName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerNames() {
+        return brokerNames;
+    }
+
+    public void setBrokerNames(String brokerNames) {
+        this.brokerNames = brokerNames;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public String getBrokerIP() {
+        return brokerIP;
+    }
+
+    public void setBrokerIP(String brokerIP) {
+        this.brokerIP = brokerIP;
+    }
+
+    public String getBrokerHost() {
+        return brokerHost;
+    }
+
+    public void setBrokerHost(String brokerHost) {
+        this.brokerHost = brokerHost;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof TopicPutNumMetric)) {
+            return false;
+        }
+        TopicPutNumMetric other = (TopicPutNumMetric) obj;
+
+        return other.clusterName.equals(clusterName) &&
+                other.brokerIP.equals(brokerIP) &&
+                other.topicName.equals(topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + topicName.hashCode();
+        hash = 37 * hash + brokerIP.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " topicName: " + topicName;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java
new file mode 100644
index 0000000..beb583e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/brokerruntime/BrokerRuntimeMetric.java
@@ -0,0 +1,91 @@
+package org.apache.rocketmq.exporter.model.metrics.brokerruntime;
+
+public class BrokerRuntimeMetric {
+    private String clusterName;
+    private String brokerAddress;
+    private String brokerHost;
+    private String brokerDes;
+    private long bootTimestamp;
+    private int brokerVersion;
+
+    public BrokerRuntimeMetric(String clusterName, String brokerAddress, String brokerHost, String brokerDes, long bootTimestamp, int brokerVersion) {
+        this.clusterName = clusterName;
+        this.brokerAddress = brokerAddress;
+        this.brokerHost = brokerHost;
+        this.brokerDes = brokerDes;
+        this.bootTimestamp = bootTimestamp;
+        this.brokerVersion = brokerVersion;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public void setBrokerAddress(String brokerAddress) {
+        this.brokerAddress = brokerAddress;
+    }
+
+    public String getBrokerHost() {
+        return brokerHost;
+    }
+
+    public void setBrokerHost(String brokerHost) {
+        this.brokerHost = brokerHost;
+    }
+
+    public String getBrokerDes() {
+        return brokerDes;
+    }
+
+    public void setBrokerDes(String brokerDes) {
+        this.brokerDes = brokerDes;
+    }
+
+    public long getBootTimestamp() {
+        return bootTimestamp;
+    }
+
+    public void setBootTimestamp(long bootTimestamp) {
+        this.bootTimestamp = bootTimestamp;
+    }
+
+    public int getBrokerVersion() {
+        return brokerVersion;
+    }
+
+    public void setBrokerVersion(int brokerVersion) {
+        this.brokerVersion = brokerVersion;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof BrokerRuntimeMetric)) {
+            return false;
+        }
+        BrokerRuntimeMetric other = (BrokerRuntimeMetric) obj;
+
+        return other.clusterName.equals(clusterName) &&
+                other.brokerAddress.equals(brokerAddress);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + brokerAddress.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " brokerAddress: " + brokerAddress + " brokerHost: " + brokerHost;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java b/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
deleted file mode 100644
index 50f5b0a..0000000
--- a/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.rocketmq.exporter.service;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
-
-import javax.annotation.Resource;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-
-public abstract class AbstractCommonService {
-    @Resource
-    protected MQAdminExt mqAdminExt;
-    protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable,
-        List<String> clusterNameList, List<String> brokerNameList) {
-        Set<String> finalBrokerNameList = Sets.newHashSet();
-        if (CollectionUtils.isNotEmpty(clusterNameList)) {
-            try {
-                for (String clusterName : clusterNameList) {
-                    finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
-                }
-            }
-            catch (Exception e) {
-                throw Throwables.propagate(e);
-            }
-        }
-        if (CollectionUtils.isNotEmpty(brokerNameList)) {
-            finalBrokerNameList.addAll(brokerNameList);
-        }
-        return finalBrokerNameList;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java b/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
index c1f8802..5e2c0d4 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.exporter.service;
 
 import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
 
-
 import java.io.IOException;
 import java.io.StringWriter;
 
 
-public interface RMQMetricsService  {
+public interface RMQMetricsService {
     RMQMetricsCollector getCollector();
-    void Metrics(StringWriter writer) throws IOException;
+
+    void metrics(StringWriter writer) throws IOException;
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
index 17ec7d5..f24490c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
@@ -18,10 +18,12 @@ package org.apache.rocketmq.exporter.service.client;
 
 import com.google.common.base.Throwables;
 import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
@@ -55,11 +57,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.joor.Reflect;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 
 import java.io.UnsupportedEncodingException;
@@ -70,8 +75,21 @@ import java.util.Set;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
 
-@Service
+@Service("mqAdminExtImpl")
 public class MQAdminExtImpl implements MQAdminExt {
+    @Autowired
+    @Qualifier("defaultMQAdminExt")
+    private DefaultMQAdminExt defaultMQAdminExt;
+
+    @Autowired
+    private DefaultMQPullConsumer pullConsumer;
+
+    @Autowired
+    private RemotingClient remotingClient;
+
+    @Autowired
+    private MQClientInstance mqClientInstance;
+
     private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
 
     public MQAdminExtImpl() {
@@ -79,37 +97,35 @@ public class MQAdminExtImpl implements MQAdminExt {
 
 
     public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
-        return MQAdminInstance.threadLocalMQPullConsumer().pull(mq, "*", offset, 1);
+        return pullConsumer.pull(mq, "*", offset, 1);
     }
 
     @Override
     public void updateBrokerConfig(String brokerAddr, Properties properties)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-        UnsupportedEncodingException, InterruptedException, MQBrokerException {
-        MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
     }
 
     @Override
     public void createAndUpdateTopicConfig(String addr, TopicConfig config)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
     }
 
     @Override
     public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config);
     }
 
     @Override
     public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
-        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = null;
         try {
             response = remotingClient.invokeSync(addr, request, 3000);
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             throw Throwables.propagate(err);
         }
         assert response != null;
@@ -125,13 +141,11 @@ public class MQAdminExtImpl implements MQAdminExt {
 
     @Override
     public TopicConfig examineTopicConfig(String addr, String topic) {
-        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
         RemotingCommand response = null;
         try {
             response = remotingClient.invokeSync(addr, request, 3000);
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             throw Throwables.propagate(err);
         }
         switch (response.getCode()) {
@@ -146,238 +160,238 @@ public class MQAdminExtImpl implements MQAdminExt {
 
     @Override
     public TopicStatsTable examineTopicStats(String topic)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.examineTopicStats(topic);
     }
 
     @Override
     public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
-        TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
+        TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
         logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
         return topicList;
     }
 
     @Override
     public KVTable fetchBrokerRuntimeStats(String brokerAddr)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-        InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
     }
 
     @Override
     public ConsumeStats examineConsumeStats(String consumerGroup)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.examineConsumeStats(consumerGroup);
     }
 
     @Override
     public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.examineConsumeStats(consumerGroup, topic);
     }
 
     @Override
     public ClusterInfo examineBrokerClusterInfo()
-        throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
+            throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException {
+        return defaultMQAdminExt.examineBrokerClusterInfo();
     }
 
     @Override
     public TopicRouteData examineTopicRouteInfo(String topic)
-        throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
+            throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.examineTopicRouteInfo(topic);
     }
 
     @Override
     public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-        InterruptedException, MQBrokerException, RemotingException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
+            throws
+            InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
     }
 
     @Override
     public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.examineProducerConnectionInfo(producerGroup, topic);
     }
 
     @Override
     public List<String> getNameServerAddressList() {
-        return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
+        return defaultMQAdminExt.getNameServerAddressList();
     }
 
     @Override
     public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
-        throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName);
+            throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQClientException {
+        return defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName);
     }
 
     @Override
     public void putKVConfig(String namespace, String key, String value) {
-        MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value);
+        defaultMQAdminExt.putKVConfig(namespace, key, value);
     }
 
     @Override
     public String getKVConfig(String namespace, String key)
-        throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key);
+            throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.getKVConfig(namespace, key);
     }
 
     @Override
     public KVTable getKVListByNamespace(String namespace)
-        throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
+            throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.getKVListByNamespace(namespace);
     }
 
     @Override
     public void deleteTopicInBroker(Set<String> addrs, String topic)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
-        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic);
+        defaultMQAdminExt.deleteTopicInBroker(addrs, topic);
     }
 
     @Override
     public void deleteTopicInNameServer(Set<String> addrs, String topic)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.deleteTopicInNameServer(addrs, topic);
     }
 
     @Override
     public void deleteSubscriptionGroup(String addr, String groupName)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName);
     }
 
     @Override
     public void createAndUpdateKvConfig(String namespace, String key, String value)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.createAndUpdateKvConfig(namespace, key, value);
     }
 
     @Override
     public void deleteKvConfig(String namespace, String key)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.deleteKvConfig(namespace, key);
     }
 
     @Override
     public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
-        boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+                                                         boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
     }
 
     @Override
     public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
-        boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
+                                                          boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, isForce);
     }
 
     @Override
     public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.resetOffsetNew(consumerGroup, topic, timestamp);
     }
 
     @Override
     public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
-        String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr);
+                                                                 String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExt.getConsumeStatus(topic, group, clientAddr);
     }
 
     @Override
     public void createOrUpdateOrderConf(String key, String value, boolean isCluster)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExt.createOrUpdateOrderConf(key, value, isCluster);
     }
 
     @Override
     public GroupList queryTopicConsumeByWho(String topic)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-        InterruptedException, MQBrokerException, RemotingException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
+            throws
+            InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return defaultMQAdminExt.queryTopicConsumeByWho(topic);
     }
 
     @Override
     public boolean cleanExpiredConsumerQueue(String cluster)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
-        InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+            InterruptedException {
+        return defaultMQAdminExt.cleanExpiredConsumerQueue(cluster);
     }
 
     @Override
     public boolean cleanExpiredConsumerQueueByAddr(String addr)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
-        InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+            InterruptedException {
+        return defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(addr);
     }
 
     @Override
     public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
-        throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
+            throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
     }
 
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
-        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
+                                                               String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
     }
 
     @Override
     public List<MessageTrack> messageTrackDetail(MessageExt msg)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.messageTrackDetail(msg);
     }
 
     @Override
     public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        defaultMQAdminExt.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
     }
 
     @Override
     public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum);
+        defaultMQAdminExt.createTopic(key, newTopic, queueNum);
     }
 
     @Override
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
-        throws MQClientException {
-        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag);
+            throws MQClientException {
+        defaultMQAdminExt.createTopic(key, newTopic, queueNum, topicSysFlag);
     }
 
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp);
+        return defaultMQAdminExt.searchOffset(mq, timestamp);
     }
 
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
+        return defaultMQAdminExt.maxOffset(mq);
     }
 
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
+        return defaultMQAdminExt.minOffset(mq);
     }
 
     @Override
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
+        return defaultMQAdminExt.earliestMsgStoreTime(mq);
     }
 
     @Override
     public MessageExt viewMessage(String msgId)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExt.viewMessage(msgId);
     }
 
     @Override
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-        throws MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end);
+            throws MQClientException, InterruptedException {
+        return defaultMQAdminExt.queryMessage(topic, key, maxNum, begin, end);
     }
 
     @Override
@@ -396,8 +410,8 @@ public class MQAdminExtImpl implements MQAdminExt {
 
     @Override
     public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
-        String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
-        return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
+                                                    String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return defaultMQAdminExt.queryConsumeTimeSpan(topic, group);
     }
 
     //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
@@ -406,110 +420,111 @@ public class MQAdminExtImpl implements MQAdminExt {
     //https://github.com/apache/incubator-rocketmq/pull/69
     @Override
     public MessageExt viewMessage(String topic,
-        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+                                  String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
         try {
             return viewMessage(msgId);
+        } catch (Exception e) {
         }
-        catch (Exception e) {
-        }
-        MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
+        MQAdminImpl mqAdminImpl = mqClientInstance.getMQAdminImpl();
         QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
-            MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
+                MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
         if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
             return qr.getMessageList().get(0);
-        }
-        else {
+        } else {
             return null;
         }
     }
 
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
-        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+                                                               String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
     }
 
     @Override
     public Properties getBrokerConfig(
-        String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
+            String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExt.getBrokerConfig(brokerAddr);
     }
 
     @Override
     public TopicList fetchTopicsByCLuster(
-        String clusterName) throws RemotingException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
+            String clusterName) throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.fetchTopicsByCLuster(clusterName);
     }
 
     @Override
     public boolean cleanUnusedTopic(
-        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
+            String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.cleanUnusedTopic(cluster);
     }
 
     @Override
     public boolean cleanUnusedTopicByAddr(
-        String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
+            String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.cleanUnusedTopicByAddr(addr);
     }
 
     @Override
     public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
-        String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey);
+                                               String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.viewBrokerStatsData(brokerAddr, statsName, statsKey);
     }
 
     @Override
     public Set<String> getClusterList(
-        String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
+            String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.getClusterList(topic);
     }
 
     @Override
     public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
-        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
-        return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+                                                      long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
     }
 
     @Override
     public Set<String> getTopicClusterList(
-        String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
-        return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
+            String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+        return defaultMQAdminExt.getTopicClusterList(topic);
     }
 
     @Override
     public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
-        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+                                                            long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return defaultMQAdminExt.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
     }
 
     @Override
     public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
-        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis);
+                                                        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return defaultMQAdminExt.getAllTopicGroup(brokerAddr, timeoutMillis);
     }
 
     @Override
     public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
-        long offset) throws RemotingException, InterruptedException, MQBrokerException {
-        MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+                                    long offset) throws RemotingException, InterruptedException, MQBrokerException {
+        defaultMQAdminExt.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
     }
 
     // 4.0.0 added
-    @Override public void updateNameServerConfig(Properties properties,
-        List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
+    @Override
+    public void updateNameServerConfig(Properties properties,
+                                       List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
 
     }
 
-    @Override public Map<String, Properties> getNameServerConfig(
-        List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
+    @Override
+    public Map<String, Properties> getNameServerConfig(
+            List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
         return null;
     }
 
-    @Override public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
-        int queueId, long index, int count,
-        String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+    @Override
+    public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
+                                                           int queueId, long index, int count,
+                                                           String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
         return null;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
index 6994c23..1b9b3df 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
@@ -16,104 +16,72 @@
  */
 package org.apache.rocketmq.exporter.service.client;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.exporter.config.RMQConfigure;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
-import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Service;
 
 import static org.apache.rocketmq.common.MixAll.TOOLS_CONSUMER_GROUP;
 
-
+@Service
 public class MQAdminInstance {
-
-    private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
-
-    private static final ThreadLocal<DefaultMQPullConsumer> MQ_PULL_CONSUMER_THREAD_LOCAL = new ThreadLocal<DefaultMQPullConsumer>();
-
-    private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
-
-    public static MQAdminExt threadLocalMQAdminExt() {
-        DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
-        if (defaultMQAdminExt == null) {
-            throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
+    private final static Logger log = LoggerFactory.getLogger(MQAdminInstance.class);
+    @Autowired
+    private RMQConfigure configure;
+
+    @Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
+    private DefaultMQAdminExt buildDefaultMQAdminExt() {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(5000L);
+        defaultMQAdminExt.setInstanceName("admin-" + System.currentTimeMillis());
+        try {
+            defaultMQAdminExt.start();
+        } catch (MQClientException ex) {
+            log.error(String.format("init default admin error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
         }
         return defaultMQAdminExt;
     }
 
-
-    public static DefaultMQPullConsumer threadLocalMQPullConsumer() {
-        DefaultMQPullConsumer pullConsumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
-        if (pullConsumer == null) {
-            throw new IllegalStateException("pullConsumer should be init before you get this");
+    @Bean(destroyMethod = "shutdown")
+    private DefaultMQPullConsumer buildPullConsumer() throws Exception {
+        String namesrvAddress = configure.getNamesrvAddr();
+        if (StringUtils.isBlank(namesrvAddress)) {
+            log.error("init default pull consumer error, namesrv is null");
+            throw new Exception("init default pull consumer error, namesrv is null", null);
         }
-        return pullConsumer;
-    }
-
-
-    public static RemotingClient threadLocalRemotingClient() {
-        MQClientInstance mqClientInstance = threadLocalMqClientInstance();
-        MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl");
-        return Reflect.on(mQClientAPIImpl).get("remotingClient");
-    }
-
-    public static MQClientInstance threadLocalMqClientInstance() {
-        DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
-        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
-    }
-
-    public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
-        Integer nowCount = INIT_COUNTER.get();
-        if (nowCount == null) {
-            DefaultMQAdminExt defaultMQAdminExt;
-            if (timeoutMillis > 0) {
-                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
-            }
-            else {
-                defaultMQAdminExt = new DefaultMQAdminExt();
-            }
-            defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
-            defaultMQAdminExt.start();
-            MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
-
-
-            DefaultMQPullConsumer   pullConsumer;
-            pullConsumer    =   new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP,null);
-            pullConsumer.setInstanceName("consumer-" + Long.toString(System.currentTimeMillis()));
-            pullConsumer.setNamesrvAddr(System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)));
+        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP, null);
+        pullConsumer.setInstanceName("consumer-" + System.currentTimeMillis());
+        pullConsumer.setNamesrvAddr(namesrvAddress);
+        try {
             pullConsumer.start();
             pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
-
-            MQ_PULL_CONSUMER_THREAD_LOCAL.set(pullConsumer);
-            INIT_COUNTER.set(1);
-        }
-        else {
-            INIT_COUNTER.set(nowCount + 1);
+        } catch (MQClientException ex) {
+            log.error(String.format("init default pull consumer error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
         }
+        return pullConsumer;
+    }
 
+    @Bean(destroyMethod = "shutdown")
+    private MQClientInstance buildInstance(@Qualifier("defaultMQAdminExt") DefaultMQAdminExt defaultMQAdminExt) {
+        DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(defaultMQAdminExt).get("defaultMQAdminExtImpl");
+        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
     }
 
-    public static void destroyMQAdminInstance() {
-        Integer nowCount = INIT_COUNTER.get() - 1;
-        if (nowCount > 0) {
-            INIT_COUNTER.set(nowCount);
-            return;
-        }
-        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
-        if (mqAdminExt != null) {
-            DefaultMQPullConsumer consumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
-            if (consumer != null) {
-                consumer.shutdown();
-                MQ_PULL_CONSUMER_THREAD_LOCAL.remove();
-            }
-            mqAdminExt.shutdown();
-            MQ_ADMIN_EXT_THREAD_LOCAL.remove();
-            INIT_COUNTER.remove();
-        }
+    @Bean
+    private RemotingClient client(MQClientInstance instance) {
+        MQClientAPIImpl mQClientAPIImpl = Reflect.on(instance).get("mQClientAPIImpl");
+        return Reflect.on(mQClientAPIImpl).get("remotingClient");
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
index 5dd008b..6cf977f 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -19,25 +19,17 @@ package org.apache.rocketmq.exporter.service.impl;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
-import org.apache.rocketmq.exporter.service.AbstractCommonService;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
 import java.io.StringWriter;
 
 @Service
-public class RMQMetricsServiceImpl extends AbstractCommonService implements RMQMetricsService {
-
-    private Logger logger = LoggerFactory.getLogger(RMQMetricsServiceImpl.class);
-
-    private  CollectorRegistry registry = new CollectorRegistry();
-
+public class RMQMetricsServiceImpl implements RMQMetricsService {
+    private CollectorRegistry registry = new CollectorRegistry();
     private final RMQMetricsCollector rmqMetricsCollector;
 
-
     public RMQMetricsCollector getCollector() {
         return rmqMetricsCollector;
     }
@@ -46,8 +38,8 @@ public class RMQMetricsServiceImpl extends AbstractCommonService implements RMQM
         rmqMetricsCollector = new RMQMetricsCollector();
         rmqMetricsCollector.register(registry);
     }
-    public void Metrics(StringWriter writer) throws IOException {
+
+    public void metrics(StringWriter writer) throws IOException {
         TextFormat.write004(writer, registry.metricFamilySamples());
-        logger.info(writer.toString());
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index c6e6a5e..f1777ec 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -16,290 +16,483 @@
  */
 package org.apache.rocketmq.exporter.task;
 
-import com.google.common.base.Throwables;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.PullStatus;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
 import org.apache.rocketmq.exporter.config.RMQConfigure;
+import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
-import org.apache.rocketmq.exporter.service.client.MQAdminExtImpl;
 import org.apache.rocketmq.exporter.util.Utils;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 @Component
 public class MetricsCollectTask {
-
     @Resource
+    @Qualifier("mqAdminExtImpl")
     private MQAdminExt mqAdminExt;
     @Resource
     private RMQConfigure rmqConfigure;
-
     @Resource
-    private RMQMetricsService  metricsService;
-
+    private RMQMetricsService metricsService;
     private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class);
 
-    @Scheduled(cron = "15 0/1 * * * ?")
-    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
-    public void collectOffset() {
+    @PostConstruct
+    public void init() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+        log.info("MetricsCollectTask init starting....");
+        long start = System.currentTimeMillis();
+        ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        StringBuilder infoOut = new StringBuilder();
+        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+            infoOut.append(String.format("cluster name= %s, broker name = %s%n", clusterName, clusterInfo.getClusterAddrTable().get(clusterName)));
+        }
+        for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) {
+            infoOut.append(String.format("broker name = %s,master broker address= %s%n", brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs().get(MixAll.MASTER_ID)));
+        }
+        log.info(infoOut.toString());
+        log.info(String.format("MetricsCollectTask init finished....cost:%d", System.currentTimeMillis() - start));
+    }
+
+    @Scheduled(cron = "${task.collectTopicOffset.cron}")
+    public void collectTopicOffset() {
         if (!rmqConfigure.isEnableCollect()) {
             return;
         }
-        Date date = new Date();
+        log.info("topic offset collection task starting....");
+        long start = System.currentTimeMillis();
+        TopicList topicList = null;
         try {
-            TopicList topicList = mqAdminExt.fetchAllTopicList();
-            Set<String> topicSet = topicList.getTopicList();
-            for (String topic : topicSet) {
-                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
-                    continue;
+            topicList = mqAdminExt.fetchAllTopicList();
+        } catch (Exception ex) {
+            log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+            return;
+        }
+        Set<String> topicSet = topicList != null ? topicList.getTopicList() : null;
+        if (topicSet == null || topicSet.isEmpty()) {
+            log.error(String.format("collectTopicOffset-the topic list is empty. the namesrv address is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+            return;
+        }
+        for (String topic : topicSet) {
+            TopicStatsTable topicStats = null;
+            try {
+                topicStats = mqAdminExt.examineTopicStats(topic);
+            } catch (Exception ex) {
+                log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
+                        topic,
+                        JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                continue;
+            }
+
+            Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
+
+            double totalMaxOffset = 0L;
+            long lastUpdateTimestamp = 0L;
+            StringBuilder sb = new StringBuilder();
+
+            for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
+                MessageQueue q = topicStatusEntry.getKey();
+                TopicOffset offset = topicStatusEntry.getValue();
+                totalMaxOffset += offset.getMaxOffset();
+                if (offset.getLastUpdateTimestamp() > lastUpdateTimestamp) {
+                    lastUpdateTimestamp = offset.getLastUpdateTimestamp();
+                }
+                sb.append(q.getBrokerName()).append(" ");
+            }
+            metricsService.getCollector().addTopicOffsetMetric("", sb.toString(), topic, lastUpdateTimestamp, totalMaxOffset);
+        }
+        log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
+    }
+
+    @Scheduled(cron = "${task.collectConsumerOffset.cron}")
+    public void collectConsumerOffset() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        log.info("consumer offset collection task starting....");
+        long start = System.currentTimeMillis();
+        TopicList topicList = null;
+        try {
+            topicList = mqAdminExt.fetchAllTopicList();
+        } catch (Exception ex) {
+            log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
+        }
+
+
+        Set<String> topicSet = topicList.getTopicList();
+        for (String topic : topicSet) {
+            GroupList groupList = null;
+
+            boolean isDLQTopic = topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
+            if (isDLQTopic) {
+                continue;
+            }
+            try {
+                groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+            } catch (Exception ex) {
+                log.warn(String.format("collectConsumerOffset-topic's consumer is empty, %s", topic));
+                continue;
+            }
+
+            if (groupList == null || groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
+                log.warn(String.format("no any consumer for topic(%s), ignore this topic", topic));
+                continue;
+            }
+
+
+            for (String group : groupList.getGroupList()) {
+                ConsumeStats consumeStats = null;
+                ConsumerConnection onlineConsumers = null;
+                long diff = 0L, totalConsumerOffset = 0L, totalBrokerOffset = 0L;
+                int countOfOnlineConsumers = 0;
+
+                double consumeTPS = 0F;
+                try {
+                    onlineConsumers = mqAdminExt.examineConsumerConnectionInfo(group);
+                } catch (InterruptedException | RemotingException ex) {
+                    log.error(String.format("get topic's(%s) online consumers(%s) exception", topic, group), ex);
+                } catch (MQClientException ex) {
+                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+                } catch (MQBrokerException ex) {
+                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
                 }
-                String clusterName = null;
-                ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-                Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
-                for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
-                    clusterName  = clusterEntry.getValue().getCluster();
-                    break;
+                if (onlineConsumers == null || onlineConsumers.getConnectionSet() == null || onlineConsumers.getConnectionSet().isEmpty()) {
+                    log.warn(String.format("no any consumer online. topic=%s, consumer group=%s. ignore this", topic, group));
+                    countOfOnlineConsumers = 0;
+                } else {
+                    countOfOnlineConsumers = onlineConsumers.getConnectionSet().size();
                 }
-                if (clusterName != null) {
-                    HashMap<String,Long>    brokerOffsetMap = new HashMap<>();
-                    TopicStatsTable topicStatus = mqAdminExt.examineTopicStats(topic);
-                    Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStatus.getOffsetTable().entrySet();
-                    for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
-                        MessageQueue q      =   topicStatusEntry.getKey();
-                        TopicOffset offset  =   topicStatusEntry.getValue();
-                        if  (brokerOffsetMap.containsKey(q.getBrokerName())) {
-                            brokerOffsetMap.put(q.getBrokerName(),brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
-                        }
-                        else {
-                            brokerOffsetMap.put(q.getBrokerName(),offset.getMaxOffset());
-                        }
-                    }
-                    Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
-                    for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
-                        metricsService.getCollector().AddTopicOffsetMetric(clusterName,brokerOffsetEntry.getKey(), topic, brokerOffsetEntry.getValue());
-                    }
+                try {
+                    consumeStats = mqAdminExt.examineConsumeStats(group, topic);
+                } catch (InterruptedException | RemotingException ex) {
+                    log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
+                } catch (MQClientException ex) {
+                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+                } catch (MQBrokerException ex) {
+                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
+                }
+                if (consumeStats == null || consumeStats.getOffsetTable() == null || consumeStats.getOffsetTable().isEmpty()) {
+                    log.warn(String.format("no any offset for consumer(%s), topic(%s), ignore this", group, topic));
+                    continue;
+                }
+                {
+                    diff = consumeStats.computeTotalDiff();
+                    consumeTPS = consumeStats.getConsumeTps();
+                    metricsService.getCollector().addGroupDiffMetric(String.valueOf(countOfOnlineConsumers), group, topic, diff);
+                    metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
                 }
+                Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStats.getOffsetTable().entrySet();
+                for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
+                    MessageQueue q = consumeStatusEntry.getKey();
+                    OffsetWrapper offset = consumeStatusEntry.getValue();
 
-                HashMap<String,Long>    consumeOffsetMap = new HashMap<>();
-                GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
-                if (groupList != null && !groupList.getGroupList().isEmpty()) {
-                    for (String group : groupList.getGroupList()) {
-                        try {
-                            ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
-                            Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
-                            for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
-                                MessageQueue q = consumeStatusEntry.getKey();
-                                OffsetWrapper offset = consumeStatusEntry.getValue();
-                                if (consumeOffsetMap.containsKey(q.getBrokerName())) {
-                                    consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
-                                } else {
-                                    consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
-                                }
-                            }
-                        } catch (Exception e) {
-                            log.info("ignore this consumer", e.getMessage());
-                        }
-                        Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet();
-                        for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries) {
-                            metricsService.getCollector().AddGroupOffsetMetric(clusterName,consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
-                        }
-                        consumeOffsetMap.clear();
-                    }
+                    //topic + consumer group 生产offset
+                    totalBrokerOffset += totalBrokerOffset + offset.getBrokerOffset();
+                    //topic + consumer group 消费offset
+                    totalConsumerOffset += offset.getConsumerOffset();
                 }
+                metricsService.getCollector().addGroupBrokerTotalOffsetMetric(topic, group, totalBrokerOffset);
+                metricsService.getCollector().addGroupConsumerTotalOffsetMetric(topic, group, totalBrokerOffset);
             }
-        } catch (Exception e) {
-            log.info("error is " + e.getMessage());
         }
+        log.info("consumer offset collection task finished...." + (System.currentTimeMillis() - start));
     }
 
-    @Scheduled(cron = "15 0/1 * * * ?")
-    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
-    public void collectTopic() {
+    @Scheduled(cron = "${task.collectBrokerStatsTopic.cron}")
+    public void collectBrokerStatsTopic() {
         if (!rmqConfigure.isEnableCollect()) {
             return;
         }
-        Date date = new Date();
+        log.info("broker topic stats collection task starting....");
+        long start = System.currentTimeMillis();
+        Set<String> topicSet = null;
         try {
             TopicList topicList = mqAdminExt.fetchAllTopicList();
-            Set<String> topicSet = topicList.getTopicList();
-            for (String topic : topicSet) {
-                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
-                    continue;
+            topicSet = topicList.getTopicList();
+        } catch (Exception ex) {
+            log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
+        }
+        if (topicSet == null || topicSet.isEmpty()) {
+            return;
+        }
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        } catch (Exception ex) {
+            log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
+        }
+
+        for (String topic : topicSet) {
+            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                continue;
+            }
+            TopicRouteData topicRouteData = null;
+
+            try {
+                topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
+            } catch (Exception ex) {
+                log.error(String.format("fetch topic route error. ignore %s", topic), ex);
+                continue;
+            }
+            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                if (!StringUtils.isBlank(masterAddr)) {
+                    BrokerStatsData bsd = null;
+                    try {
+                        //topic发了多少条消息
+                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
+                        String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
+                        metricsService.getCollector().addTopicPutNumsMetric(
+                                bd.getCluster(),
+                                bd.getBrokerName(),
+                                brokerIP,
+                                "",
+                                topic,
+                                Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                        );
+                    } catch (MQClientException ex) {
+                        if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                            log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
+                        } else {
+                            log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+                        }
+                    } catch (RemotingTimeoutException | InterruptedException | RemotingSendRequestException | RemotingConnectException ex1) {
+                        log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex1);
+                    }
+                    try {
+                        //topic总共发了多少字节
+                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
+                        String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
+                        metricsService.getCollector().addTopicPutSizeMetric(
+                                bd.getCluster(),
+                                bd.getBrokerName(),
+                                brokerIP,
+                                "",
+                                topic,
+                                Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                        );
+                    } catch (MQClientException ex) {
+                        if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                            log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
+                        } else {
+                            log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+                        }
+                    } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+                        log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
+                    }
                 }
-                TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
-                GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+            }
 
+            GroupList groupList = null;
+            try {
+                groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+            } catch (Exception ex) {
+                log.error(String.format("collectBrokerStatsTopic-fetch consumers for topic(%s) error, ignore this topic", topic), ex);
+                return;
+            }
+            if (groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
+                log.warn(String.format("collectBrokerStatsTopic-topic's consumer is empty, %s", topic));
+                return;
+            }
+            for (String group : groupList.getGroupList())
                 for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                     String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
                     if (masterAddr != null) {
+                        String statsKey = String.format("%s@%s", topic, group);
+                        BrokerStatsData bsd = null;
                         try {
-                            BrokerStatsData bsd = null;
-                            try {
-                                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
-                                metricsService.getCollector().AddTopicPutNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                            }
-                            catch (Exception e) {
-                                log.info("error is " + e.getMessage());
-                            }
-                            try {
-                                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
-                                metricsService.getCollector().AddTopicPutSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                            //消费者消费了多少条消息
+                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
+                            metricsService.getCollector().addGroupGetNumsMetric(
+                                    topic,
+                                    group,
+                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                        } catch (MQClientException ex) {
+                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                                log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+                            } else {
+                                log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
                             }
-                            catch (Exception e) {
-                                log.info("error is " + e.getMessage());
+                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+                            log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
+                        }
+                        try {
+                            //消费者消费了多少字节
+                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
+                            metricsService.getCollector().addGroupGetSizeMetric(
+                                    topic,
+                                    group,
+                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                        } catch (MQClientException ex) {
+                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                                log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+                            } else {
+                                log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                             }
-                        } catch (Exception e) {
-                            log.info("error is " + e.getMessage());
+                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+                            log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                         }
-                    }
-                }
-                if (groupList != null && !groupList.getGroupList().isEmpty()) {
-                    for (String group : groupList.getGroupList()) {
-                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
-                            String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
-                            if (masterAddr != null) {
-                                try {
-                                    String statsKey = String.format("%s@%s", topic, group);
-                                    BrokerStatsData bsd = null;
-                                    try {
-                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
-                                        metricsService.getCollector().AddGroupGetNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                                    } catch (Exception e) {
-                                        log.info("error is " + e.getMessage());
-                                    }
-                                    try {
-                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
-                                        metricsService.getCollector().AddGroupGetSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                                    } catch (Exception e) {
-                                        log.info("error is " + e.getMessage());
-                                    }
-                                    try {
-
-                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
-                                        metricsService.getCollector().AddsendBackNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                                    } catch (Exception e) {
-                                        log.info("error is " + e.getMessage());
-                                    }
-                                    try {
-                                        collectLatencyMetrcisInner(topic, group, masterAddr, bd);
-                                    } catch (Exception e) {
-                                        log.info("error is " + e.getMessage());
-                                    }
-                                } catch (Exception e) {
-                                    log.info("error is " + e.getMessage());
-                                }
+                        try {
+                            //消费者重新消费topic的次数
+                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
+                            metricsService.getCollector().addSendBackNumsMetric(
+                                    topic,
+                                    group,
+                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                        } catch (MQClientException ex) {
+                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                                log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
+                            } else {
+                                log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                             }
+                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
+                            log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                         }
                     }
                 }
-            }
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
         }
+        log.info("broker topic stats collection task finished...." + (System.currentTimeMillis() - start));
     }
 
-    @Scheduled(cron = "15 0/1 * * * ?")
-    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
-    public void collectBroker() {
+    @Scheduled(cron = "${task.collectBrokerStats.cron}")
+    public void collectBrokerStats() {
         if (!rmqConfigure.isEnableCollect()) {
             return;
         }
+        log.info("broker stats collection task starting....");
+        long start = System.currentTimeMillis();
+        ClusterInfo clusterInfo = null;
         try {
-            Date date = new Date();
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
-            for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
-                String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
-                if (masterAddr != null) {
-                    try {
-                        BrokerStatsData bsd = null;
-                        try {
-                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS,clusterEntry.getValue().getCluster());
-                            metricsService.getCollector().AddBrokerPutNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                        }
-                        catch (Exception e) {
-                            log.info("error is " + e.getMessage());
-                        }
-                        try {
-                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
-                            metricsService.getCollector().AddBrokerGetNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-                        }
-                        catch (Exception e) {
-                            log.info("error is " + e.getMessage());
-                        }
-                    } catch (Exception e) {
-                        log.info("error is " + e.getMessage());
-                    }
-                }
-            }
-        }
-        catch (Exception err) {
-            throw Throwables.propagate(err);
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        } catch (Exception ex) {
+            log.error(String.format("collectBrokerStats-get cluster info from namesrv error. address is %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
         }
-    }
-    private void collectLatencyMetrcisInner(String topic,String group,String masterAddr, BrokerData bd) throws Exception {
-        long maxLagTime = 0;
-        String statsKey;
-        BrokerStatsData bsd = null;
-        ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
-        Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
-        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
-            MessageQueue q = consumeStatusEntry.getKey();
-            OffsetWrapper offset = consumeStatusEntry.getValue();
-            int queueId = q.getQueueId();
-            statsKey = String.format("%d@%s@%s", queueId, topic, group);
+
+        Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+        for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+            String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+            if (StringUtils.isBlank(masterAddr)) {
+                continue;
+            }
+            BrokerStatsData bsd = null;
             try {
-                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_LATENCY, statsKey);
-                metricsService.getCollector().AddGroupGetLatencyMetric(bd.getCluster(), bd.getBrokerName(), topic, group, String.format("%d", queueId), Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
-            } catch (Exception e) {
-                log.info("error is " + e.getMessage());
+                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
+                String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+                metricsService.getCollector().addBrokerPutNumsMetric(
+                        clusterEntry.getValue().getCluster(),
+                        brokerIP,
+                        "",
+                        Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+            } catch (Exception ex) {
+                log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
             }
-            MQAdminExtImpl mqAdminImpl = (MQAdminExtImpl) mqAdminExt;
-            PullResult consumePullResult = mqAdminImpl.queryMsgByOffset(q, offset.getConsumerOffset());
-            long lagTime = 0;
-            if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
-                lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
-                if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
-                    lagTime = 0;
-                }
-            } else if (consumePullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
-                lagTime = 0;
-            } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
-                PullResult pullResult = mqAdminImpl.queryMsgByOffset(q, consumePullResult.getMinOffset());
-                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
-                    lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+            try {
+                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
+                String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+                metricsService.getCollector().addBrokerGetNumsMetric(
+                        clusterEntry.getValue().getCluster(),
+                        brokerIP,
+                        "",
+                        Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+            } catch (Exception ex) {
+                log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
+            }
+        }
+        log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
+    }
+
+    @Scheduled(cron = "${task.collectBrokerRuntimeStats.cron}")
+    public void collectBrokerRuntimeStats() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        log.info("broker runtime stats collection task starting....");
+        long start = System.currentTimeMillis();
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        } catch (Exception ex) {
+            log.error(String.format("collectBrokerRuntimeStats-get cluster info from namesrv error. address is %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
+        }
+
+        Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+        for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+            String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+            String clusterName = clusterEntry.getValue().getCluster();
+
+            KVTable kvTable = null;
+            if (!StringUtils.isBlank(masterAddr)) {
+                try {
+                    kvTable = mqAdminExt.fetchBrokerRuntimeStats(masterAddr);
+                } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException | InterruptedException ex) {
+                    log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s", masterAddr), ex);
+                } catch (MQBrokerException ex) {
+                    if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
+                        log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s, error=%s", masterAddr, ex.getErrorMessage()));
+                    } else {
+                        log.error(String.format("collectBrokerRuntimeStats-get fetch broker runtime stats error, address=%s", masterAddr), ex);
+                    }
                 }
-            } else {
-                lagTime = 0;
             }
-            if (lagTime > maxLagTime) {
-                maxLagTime = lagTime;
+            if (kvTable == null || kvTable.getTable() == null || kvTable.getTable().isEmpty()) {
+                continue;
             }
+            try {
+                BrokerRuntimeStats brokerRuntimeStats = new BrokerRuntimeStats(kvTable);
+                metricsService.getCollector().addBrokerRuntimeStatsMetric(brokerRuntimeStats, clusterName, masterAddr, "");
+            } catch (Exception ex) {
+                log.error(String.format("collectBrokerRuntimeStats-parse or report broker runtime stats error, %s", JSON.toJSONString(kvTable)), ex);
+            }
+
+        }
+
+        log.info("broker runtime stats collection task finished...." + (System.currentTimeMillis() - start));
+    }
+
+    private void handleTopicNotExistException(int responseCode, Exception ex, String topic, String group) {
+        if (responseCode == ResponseCode.TOPIC_NOT_EXIST || responseCode == ResponseCode.CONSUMER_NOT_ONLINE) {
+            log.error(String.format("get topic's(%s) consumer-stats(%s) exception, detail: %s", topic, group, ex.getMessage()));
+        } else {
+            log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
         }
-        metricsService.getCollector().AddGroupGetLatencyByStoreTimeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, maxLagTime);
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java b/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
index 29317f9..1cb147c 100644
--- a/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
+++ b/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
@@ -55,8 +55,7 @@ public class JsonUtil {
     public static void writeValue(Writer writer, Object obj) {
         try {
             objectMapper.writeValue(writer, obj);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             Throwables.propagateIfPossible(e);
         }
     }
@@ -67,9 +66,8 @@ public class JsonUtil {
         }
 
         try {
-            return src instanceof String ? (String)src : objectMapper.writeValueAsString(src);
-        }
-        catch (Exception e) {
+            return src instanceof String ? (String) src : objectMapper.writeValueAsString(src);
+        } catch (Exception e) {
             logger.error("Parse Object to String error src=" + src, e);
             return null;
         }
@@ -81,9 +79,8 @@ public class JsonUtil {
         }
 
         try {
-            return src instanceof byte[] ? (byte[])src : objectMapper.writeValueAsBytes(src);
-        }
-        catch (Exception e) {
+            return src instanceof byte[] ? (byte[]) src : objectMapper.writeValueAsBytes(src);
+        } catch (Exception e) {
             logger.error("Parse Object to byte[] error", e);
             return null;
         }
@@ -95,9 +92,8 @@ public class JsonUtil {
         }
         str = escapesSpecialChar(str);
         try {
-            return clazz.equals(String.class) ? (T)str : objectMapper.readValue(str, clazz);
-        }
-        catch (Exception e) {
+            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
+        } catch (Exception e) {
             logger.error("Parse String to Object error\nString: {}\nClass<T>: {}\nError: {}", str, clazz.getName(), e);
             return null;
         }
@@ -108,9 +104,8 @@ public class JsonUtil {
             return null;
         }
         try {
-            return clazz.equals(byte[].class) ? (T)bytes : objectMapper.readValue(bytes, clazz);
-        }
-        catch (Exception e) {
+            return clazz.equals(byte[].class) ? (T) bytes : objectMapper.readValue(bytes, clazz);
+        } catch (Exception e) {
             logger.error("Parse byte[] to Object error\nbyte[]: {}\nClass<T>: {}\nError: {}", bytes, clazz.getName(), e);
             return null;
         }
@@ -122,11 +117,10 @@ public class JsonUtil {
         }
         str = escapesSpecialChar(str);
         try {
-            return (T)(typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
-        }
-        catch (Exception e) {
+            return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
+        } catch (Exception e) {
             logger.error("Parse String to Object error\nString: {}\nTypeReference<T>: {}\nError: {}", str,
-                typeReference.getType(), e);
+                    typeReference.getType(), e);
             return null;
         }
     }
@@ -136,12 +130,11 @@ public class JsonUtil {
             return null;
         }
         try {
-            return (T)(typeReference.getType().equals(byte[].class) ? bytes : objectMapper.readValue(bytes,
-                typeReference));
-        }
-        catch (Exception e) {
+            return (T) (typeReference.getType().equals(byte[].class) ? bytes : objectMapper.readValue(bytes,
+                    typeReference));
+        } catch (Exception e) {
             logger.error("Parse byte[] to Object error\nbyte[]: {}\nTypeReference<T>: {}\nError: {}", bytes,
-                typeReference.getType(), e);
+                    typeReference.getType(), e);
             return null;
         }
     }
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index 083738e..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1,14 +0,0 @@
-server.port=5557
-
-spring.application.name=rocketmq-exporter
-spring.http.encoding.charset=UTF-8
-spring.http.encoding.enabled=true
-spring.http.encoding.force=true
-logging.config=classpath:logback.xml
-#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR
-rocketmq.config.namesrvAddr=127.0.0.1:9876
-
-
-rocketmq.config.enableCollect=true
-rocketmq.config.webTelemetryPath=/metrics
-rocketmq.config.rocketmqVersion=4_3_2
\ No newline at end of file
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..8ff9e07
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,32 @@
+server:
+  port: 5557
+spring:
+  application:
+    name: rocketmq-exporter
+  http:
+    encoding:
+      charset: UTF-8
+      enabled: true
+      force: true
+logging:
+  config: classpath:logback.xml
+
+rocketmq:
+  config:
+    webTelemetryPath: /metrics
+    rocketmqVersion: 4_2_0
+    namesrvAddr: 127.0.0.1:9876 #
+    enableCollect: true
+
+task:
+  count: 5
+  collectTopicOffset:
+    cron: 15 0/1 * * * ?
+  collectConsumerOffset:
+    cron: 15 0/1 * * * ?
+  collectBrokerStatsTopic:
+    cron: 15 0/1 * * * ?
+  collectBrokerStats:
+    cron: 15 0/1 * * * ?
+  collectBrokerRuntimeStats:
+    cron: 15 0/1 * * * ?
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index b5291d3..8032516 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -1,33 +1,32 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
-	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-		<encoder charset="UTF-8">
-			<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
-		</encoder>
-	</appender>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder charset="UTF-8">
+            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+        </encoder>
+    </appender>
 
-	<appender name="FILE"
-		class="ch.qos.logback.core.rolling.RollingFileAppender">
-		<file>${user.home}/logs/exporterlogs/rocketmq-exporter.log</file>
-		<append>true</append>
-		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-			<fileNamePattern>${user.home}/logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
-			</fileNamePattern>
-			<timeBasedFileNamingAndTriggeringPolicy
-				class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-				<maxFileSize>104857600</maxFileSize>
-			</timeBasedFileNamingAndTriggeringPolicy>
-			<MaxHistory>10</MaxHistory>
-		</rollingPolicy>
-		<encoder>
-			<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
-			<charset class="java.nio.charset.Charset">UTF-8</charset>
-		</encoder>
-	</appender>
+    <appender name="FILE"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>./logs/exporterlogs/rocketmq-exporter.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>./logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
+            </fileNamePattern>
+            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>104857600</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+            <totalSizeCap>20gb</totalSizeCap>
+        </rollingPolicy>
+        <encoder>
+            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
 
-	<root level="INFO">
-		<appender-ref ref="STDOUT" />
-		<appender-ref ref="FILE" />
-	</root>
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FILE"/>
+    </root>
 
-</configuration> 
\ No newline at end of file
+</configuration>