You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/06 05:07:13 UTC

[incubator-inlong] branch master updated: [INLONG-1908]Adjust the metric realization of TubeMQ (#1909)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a905c6  [INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
1a905c6 is described below

commit 1a905c6a1682351fadef81fc082f3a036efd37c0
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Dec 6 13:07:07 2021 +0800

    [INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
---
 .../tubemq/corebase/metric/AbsMetricItem.java      |   8 +-
 .../tubemq/corebase/metric/CountMetricItem.java    |   2 +-
 .../corebase/metric/GaugeNormMetricItem.java       |   2 +-
 .../inlong/tubemq/corebase/metric/MetricValue.java |   3 -
 .../tubemq/corebase/metric/MetricValueType.java    |  14 +-
 .../tubemq/corebase/metric/MetricValues.java       |   2 -
 .../tubemq/corebase/metric/MetricItemTest.java     |  93 +++++
 .../tubemq/server/broker/BrokerServiceServer.java  |   9 +-
 .../inlong/tubemq/server/broker/TubeBroker.java    |   5 +-
 .../server/broker/metrics/BrokerMetricMXBean.java  |  11 +-
 .../server/broker/metrics/BrokerMetrics.java       |  82 +++-
 .../server/broker/metrics/BrokerMetricsHolder.java |  50 ++-
 .../server/broker/msgstore/disk/FileSegment.java   |   8 +-
 .../server/broker/msgstore/disk/MsgFileStore.java  |   6 +-
 .../server/broker/msgstore/mem/MsgMemStore.java    |   6 +-
 .../server/broker/offset/DefaultOffsetManager.java |   6 +-
 .../common/offsetstorage/ZkOffsetStorage.java      |   8 +-
 .../inlong/tubemq/server/master/TMaster.java       |  54 +--
 .../server/master/balance/DefaultLoadBalancer.java |  16 +-
 .../tubemq/server/master/metrics/MasterMetric.java | 108 -----
 .../metrics/MasterMetricMXBean.java}               |  15 +-
 .../server/master/metrics/MasterMetrics.java       | 274 ++++++++++++
 .../server/master/metrics/MasterMetricsHolder.java | 164 ++++++++
 .../nodemanage/nodebroker/BrokerAbnHolder.java     |  26 +-
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  17 +-
 .../nodeconsumer/ConsumerEventManager.java         |  19 +-
 .../nodeconsumer/ConsumerInfoHolder.java           |  37 +-
 .../nodeproducer/ProducerInfoHolder.java           |  18 +-
 .../tubemq/server/broker/BrokerMetricsTest.java    | 207 +++++++--
 .../tubemq/server/master/MasterMetricsTest.java    | 463 ++++++++++++++++++++-
 .../nodeconsumer/ConsumerEventManagerTest.java     |   6 +-
 31 files changed, 1408 insertions(+), 331 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
index 3bf0b43..7a1e21c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
@@ -27,10 +27,6 @@ public abstract class AbsMetricItem {
     protected final String name;
     protected final AtomicLong value = new AtomicLong(0);
 
-    public AbsMetricItem(MetricType metricType, String name) {
-        this(metricType, MetricValueType.NORMAL, name, 0);
-    }
-
     public AbsMetricItem(MetricType metricType, MetricValueType valueType,
                          String name, long initialValue) {
         this.metricType = metricType;
@@ -67,6 +63,10 @@ public abstract class AbsMetricItem {
         return value.incrementAndGet();
     }
 
+    public boolean compareAndSet(long expect, long update) {
+        return value.compareAndSet(expect, update);
+    }
+
     public long decrementAndGet() {
         return value.decrementAndGet();
     }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
index 67f2c62..194dec8 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
@@ -20,7 +20,7 @@ package org.apache.inlong.tubemq.corebase.metric;
 public class CountMetricItem extends AbsMetricItem {
 
     public CountMetricItem(String name) {
-        super(MetricType.COUNTER, name);
+        super(MetricType.COUNTER, MetricValueType.MAX, name, 0);
     }
 
     @Override
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
index 2005bab..a221047 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
@@ -20,7 +20,7 @@ package org.apache.inlong.tubemq.corebase.metric;
 public class GaugeNormMetricItem extends AbsMetricItem {
 
     public GaugeNormMetricItem(String name) {
-        super(MetricType.GAUGE, MetricValueType.MIN, name, 0);
+        super(MetricType.GAUGE, MetricValueType.NORMAL, name, 0);
     }
 
     @Override
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
index 4ff4aa4..18e0477 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
@@ -17,14 +17,11 @@
 
 package org.apache.inlong.tubemq.corebase.metric;
 
-import java.beans.ConstructorProperties;
-
 public class MetricValue {
     private final String type;
     private final String name;
     private final long value;
 
-    @ConstructorProperties({"type", "name", "value"})
     public MetricValue(String type, String name, long value) {
         this.name = name;
         this.type = type;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
index 64d972c..ded31b7 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
@@ -18,13 +18,14 @@
 package org.apache.inlong.tubemq.corebase.metric;
 
 public enum MetricValueType {
-    NORMAL(0, "Normal"),
-    MIN(1, "Min"),
-    MAX(2, "Max");
+    NORMAL(0, "Normal", "Current value"),
+    MIN(1, "Min", "Historical minimum value"),
+    MAX(2, "Max", "Historical maximum value");
 
-    MetricValueType(int id, String name) {
+    MetricValueType(int id, String name, String desc) {
         this.id = id;
         this.name = name;
+        this.desc = desc;
     }
 
     public int getId() {
@@ -35,6 +36,10 @@ public enum MetricValueType {
         return name;
     }
 
+    public String getDesc() {
+        return desc;
+    }
+
     public static MetricValueType valueOf(int value) {
         for (MetricValueType valueType : MetricValueType.values()) {
             if (valueType.getId() == value) {
@@ -46,4 +51,5 @@ public enum MetricValueType {
 
     private final int id;
     private final String name;
+    private final String desc;
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
index 34c771b..1ab380d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
@@ -17,14 +17,12 @@
 
 package org.apache.inlong.tubemq.corebase.metric;
 
-import java.beans.ConstructorProperties;
 import java.util.Map;
 
 public class MetricValues {
     private final String lastResetTime;
     private final Map<String, Long> metricValues;
 
-    @ConstructorProperties({"lastResetTime", "metricValues"})
     public MetricValues(String lastResetTime, Map<String, Long> metricValues) {
         this.lastResetTime = lastResetTime;
         this.metricValues = metricValues;
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java
new file mode 100644
index 0000000..e941b55
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricItemTest {
+    private static final Logger logger =
+            LoggerFactory.getLogger(MetricItemTest.class);
+
+    @Test
+    public void testMetricItem() {
+        try {
+            final CountMetricItem countMetricItem =
+                    new CountMetricItem("CountMetricItem");
+            final GaugeNormMetricItem gaugeNormMetricItem =
+                    new GaugeNormMetricItem("GaugeNormMetricItem");
+            final GaugeMaxMetricItem gaugeMaxMetricItem =
+                    new GaugeMaxMetricItem("GaugeMaxMetricItem");
+            final GaugeMinMetricItem gaugeMinMetricItem =
+                    new GaugeMinMetricItem("GaugeMinMetricItem");
+
+            countMetricItem.incrementAndGet();
+            countMetricItem.incrementAndGet();
+            countMetricItem.incrementAndGet();
+            countMetricItem.decrementAndGet();
+
+            gaugeNormMetricItem.update(1000);
+            gaugeNormMetricItem.update(2000);
+            gaugeNormMetricItem.update(500);
+
+            gaugeMaxMetricItem.update(1000);
+            gaugeMaxMetricItem.update(5000);
+            gaugeMaxMetricItem.update(3000);
+
+            gaugeMinMetricItem.update(1000);
+            gaugeMinMetricItem.update(1);
+            gaugeMinMetricItem.update(10000);
+
+            Assert.assertEquals(2, countMetricItem.getValue());
+            Assert.assertEquals(500, gaugeNormMetricItem.getValue());
+            Assert.assertEquals(5000, gaugeMaxMetricItem.getValue());
+            Assert.assertEquals(1, gaugeMinMetricItem.getValue());
+
+            countMetricItem.getAndSet();
+            gaugeNormMetricItem.getAndSet();
+            gaugeMaxMetricItem.getAndSet();
+            gaugeMinMetricItem.getAndSet();
+
+            Assert.assertEquals(0, countMetricItem.getValue());
+            Assert.assertEquals(500, gaugeNormMetricItem.getValue());
+            Assert.assertEquals(0, gaugeMaxMetricItem.getValue());
+            Assert.assertEquals(Long.MAX_VALUE, gaugeMinMetricItem.getValue());
+
+            Assert.assertEquals(MetricType.COUNTER.getId(),
+                    countMetricItem.getMetricType().getId());
+            Assert.assertEquals(MetricValueType.MAX.getId(),
+                    countMetricItem.getMetricValueType().getId());
+            Assert.assertEquals(MetricType.GAUGE.getId(),
+                    gaugeNormMetricItem.getMetricType().getId());
+            Assert.assertEquals(MetricValueType.NORMAL.getId(),
+                    gaugeNormMetricItem.getMetricValueType().getId());
+            Assert.assertEquals(MetricType.GAUGE.getId(),
+                    gaugeMaxMetricItem.getMetricType().getId());
+            Assert.assertEquals(MetricValueType.MAX.getId(),
+                    gaugeMaxMetricItem.getMetricValueType().getId());
+            Assert.assertEquals(MetricType.GAUGE.getId(),
+                    gaugeMinMetricItem.getMetricType().getId());
+            Assert.assertEquals(MetricValueType.MIN.getId(),
+                    gaugeMinMetricItem.getMetricValueType().getId());
+        } catch (Exception ex) {
+            logger.error("error happens" + ex);
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 6fe76f4..f6cfa46 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -845,7 +845,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             consumerNodeInfo = new ConsumerNodeInfo(storeManager, reqQryPriorityId,
                     clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr);
             if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
-                BrokerMetricsHolder.METRICS.consumerOnlineCnt.incrementAndGet();
+                BrokerMetricsHolder.incConsumerCnt();
             }
             heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr);
             MessageStore dataStore = null;
@@ -891,7 +891,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                     heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(consumerId, partStr));
             if (timeoutInfo == null || System.currentTimeMillis() >= timeoutInfo.getTimeoutTime()) {
                 if (consumerRegisterMap.remove(partStr) != null) {
-                    BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
+                    BrokerMetricsHolder.decConsumerCnt(true);
                 }
                 strBuffer.append("[Duplicated Register] Remove Invalid Consumer Register ")
                         .append(consumerId).append(TokenConstants.SEGMENT_SEP).append(partStr);
@@ -957,7 +957,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                     .append(request.getPartitionId()).append(" updatedOffset:").append(updatedOffset).toString());
             strBuffer.delete(0, strBuffer.length());
             if (consumerRegisterMap.remove(partStr) != null) {
-                BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
+                BrokerMetricsHolder.decConsumerCnt(false);
             }
             heartbeatManager.unRegConsumerNode(
                     getHeartbeatNodeId(clientId, partStr));
@@ -1245,8 +1245,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                     }
                     if (consumerNodeInfo.getConsumerId().equalsIgnoreCase(nodeInfo.getSecondKey())) {
                         if (consumerRegisterMap.remove(nodeInfo.getThirdKey()) != null) {
-                            BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
-                            BrokerMetricsHolder.METRICS.consumerTmoTotCnt.decrementAndGet();
+                            BrokerMetricsHolder.decConsumerCnt(true);
                         }
                         String[] groupTopicPart =
                                 consumerNodeInfo.getPartStr().split(TokenConstants.ATTR_SEP);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index 474ddfb..6773264 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -218,8 +218,7 @@ public class TubeBroker implements Stoppable {
                             if (!response.getSuccess()) {
                                 isKeepAlive.set(false);
                                 if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
-                                    BrokerMetricsHolder.METRICS
-                                            .masterNoNodeCnt.incrementAndGet();
+                                    BrokerMetricsHolder.incMasterNoNodeCnt();
                                     register2Master();
                                     heartbeatErrors.set(0);
                                     logger.info("Re-register to master successfully!");
@@ -234,7 +233,7 @@ public class TubeBroker implements Stoppable {
                             isKeepAlive.set(false);
                             heartbeatErrors.incrementAndGet();
                             samplePrintCtrl.printExceptionCaught(t);
-                            BrokerMetricsHolder.METRICS.hbExceptionCnt.incrementAndGet();
+                            BrokerMetricsHolder.incHBExceptionCnt();
                         }
                     }
                 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
index f8f0c38..3058556 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
@@ -20,14 +20,17 @@ package org.apache.inlong.tubemq.server.broker.metrics;
 import org.apache.inlong.tubemq.corebase.metric.MetricValues;
 
 /**
- * BrokerMonitorMXBean
- * Provide access interface of a metric item with JMX.<br>
- * Decouple between metric item and monitor system, in particular scene, <br>
- * inlong can depend on user-defined monitor system.
+ * BrokerMetricMXBean
+ * Broker's metric data access interface, including:
+ * the getMetric() that  directly obtains data
+ * the getAndReSetMetrics() that can clear the values of
+ *     the counter, maximum and minimum extremum Gauge data
  */
 public interface BrokerMetricMXBean {
 
+    // get current metric data by viewing mode
     MetricValues getMetrics();
 
+    // get current metric data and reset the Counter, maximum/minimum Gauge metric
     MetricValues getAndReSetMetrics();
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
index f7588c8..aa573c9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
@@ -33,31 +33,33 @@ public class BrokerMetrics implements BrokerMetricMXBean {
 
     private final AtomicLong lastResetTime =
             new AtomicLong(System.currentTimeMillis());
-    public final AbsMetricItem syncDataDurMin =
-            new GaugeMinMetricItem("fSync_latency_min");
-    public final AbsMetricItem syncDataDurMax =
-            new GaugeMaxMetricItem("fSync_latency_max");
-    public final AbsMetricItem syncZkDurMin =
-            new GaugeMinMetricItem("zkSync_latency_min");
-    public final AbsMetricItem syncZkDurMax =
-            new GaugeMaxMetricItem("zkSync_latency_max");
-    public final AbsMetricItem zkExceptionCnt =
+    // Delay statistics for syncing data to files
+    protected final AbsMetricItem syncDataDurMin =
+            new GaugeMinMetricItem("fSync_duration_min");
+    protected final AbsMetricItem syncDataDurMax =
+            new GaugeMaxMetricItem("fSync_duration_max");
+    // Delay statistics for syncing data to Zookeeper
+    protected final AbsMetricItem syncZkDurMin =
+            new GaugeMinMetricItem("zkSync_duration_min");
+    protected final AbsMetricItem syncZkDurMax =
+            new GaugeMaxMetricItem("zkSync_duration_max");
+    // Zookeeper Exception statistics
+    protected final AbsMetricItem zkExceptionCnt =
             new CountMetricItem("zk_exception_cnt");
-    public final AbsMetricItem masterNoNodeCnt =
+    // Broker 2 Master status statistics
+    protected final AbsMetricItem masterNoNodeCnt =
             new CountMetricItem("online_timeout_cnt");
-    public final AbsMetricItem hbExceptionCnt =
+    protected final AbsMetricItem hbExceptionCnt =
             new CountMetricItem("hb_master_exception_cnt");
-    public final AbsMetricItem ioExceptionCnt =
+    // Disk IO Exception statistics
+    protected final AbsMetricItem ioExceptionCnt =
             new CountMetricItem("io_exception_cnt");
-    public final AbsMetricItem consumerOnlineCnt =
+    // Consumer client statistics
+    protected final AbsMetricItem consumerOnlineCnt =
             new GaugeNormMetricItem("consumer_online_cnt");
-    public final AbsMetricItem consumerTmoTotCnt =
+    protected final AbsMetricItem consumerTmoTotCnt =
             new CountMetricItem("consumer_timeout_cnt");
 
-    public BrokerMetrics() {
-        this.lastResetTime.set(System.currentTimeMillis());
-    }
-
     @Override
     public MetricValues getMetrics() {
         Map<String, Long> metricValues = new HashMap<>();
@@ -92,5 +94,49 @@ public class BrokerMetrics implements BrokerMetricMXBean {
         return new MetricValues(
                 WebParameterUtils.date2yyyyMMddHHmmss(new Date(befTime)), metricValues);
     }
+
+    public long getLastResetTime() {
+        return lastResetTime.get();
+    }
+
+    public AbsMetricItem getSyncDataDurMin() {
+        return syncDataDurMin;
+    }
+
+    public AbsMetricItem getSyncDataDurMax() {
+        return syncDataDurMax;
+    }
+
+    public AbsMetricItem getSyncZkDurMin() {
+        return syncZkDurMin;
+    }
+
+    public AbsMetricItem getSyncZkDurMax() {
+        return syncZkDurMax;
+    }
+
+    public AbsMetricItem getZkExceptionCnt() {
+        return zkExceptionCnt;
+    }
+
+    public AbsMetricItem getMasterNoNodeCnt() {
+        return masterNoNodeCnt;
+    }
+
+    public AbsMetricItem getHbExceptionCnt() {
+        return hbExceptionCnt;
+    }
+
+    public AbsMetricItem getIoExceptionCnt() {
+        return ioExceptionCnt;
+    }
+
+    public AbsMetricItem getConsumerOnlineCnt() {
+        return consumerOnlineCnt;
+    }
+
+    public AbsMetricItem getConsumerTmoTotCnt() {
+        return consumerTmoTotCnt;
+    }
 }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
index 845f14d..b64c678 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
@@ -27,9 +27,10 @@ import org.slf4j.LoggerFactory;
 public class BrokerMetricsHolder {
     private static final Logger logger =
             LoggerFactory.getLogger(BrokerMetricsHolder.class);
-
+    // Registration status indicator
     private static final AtomicBoolean registered = new AtomicBoolean(false);
-    public static final BrokerMetrics METRICS = new BrokerMetrics();
+    // broker metrics information
+    private static final BrokerMetrics statsInfo = new BrokerMetrics();
 
     public static void registerMXBean() {
         if (!registered.compareAndSet(false, true)) {
@@ -38,11 +39,52 @@ public class BrokerMetricsHolder {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             ObjectName mxBeanName =
-                    new ObjectName("org.apache.inlong.tubemq.server.broker:type=brokerMetrics");
-            mbs.registerMBean(METRICS, mxBeanName);
+                    new ObjectName("org.apache.inlong.tubemq.server.broker:type=BrokerMetrics");
+            mbs.registerMBean(statsInfo, mxBeanName);
         } catch (Exception ex) {
             logger.error("Register BrokerMXBean error: ", ex);
         }
     }
+
+    public static void incConsumerCnt() {
+        statsInfo.consumerOnlineCnt.incrementAndGet();
+    }
+
+    public static void decConsumerCnt(boolean isTimeout) {
+        statsInfo.consumerOnlineCnt.decrementAndGet();
+        if (isTimeout) {
+            statsInfo.consumerTmoTotCnt.incrementAndGet();
+        }
+    }
+
+    public static void incMasterNoNodeCnt() {
+        statsInfo.masterNoNodeCnt.incrementAndGet();
+    }
+
+    public static void incHBExceptionCnt() {
+        statsInfo.hbExceptionCnt.incrementAndGet();
+    }
+
+    public static void incIOExceptionCnt() {
+        statsInfo.ioExceptionCnt.incrementAndGet();
+    }
+
+    public static void incZKExceptionCnt() {
+        statsInfo.zkExceptionCnt.incrementAndGet();
+    }
+
+    public static void updSyncDataDurations(long dltTime) {
+        statsInfo.syncDataDurMin.update(dltTime);
+        statsInfo.syncDataDurMax.update(dltTime);
+    }
+
+    public static void updSyncZKDurations(long dltTime) {
+        statsInfo.syncZkDurMin.update(dltTime);
+        statsInfo.syncZkDurMax.update(dltTime);
+    }
+
+    public static BrokerMetrics getStatsInfo() {
+        return statsInfo;
+    }
 }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
index a6202cf..42580bd 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -115,7 +115,7 @@ public class FileSegment implements Segment {
             } catch (final Exception e) {
                 if (e instanceof IOException) {
                     ServiceStatusHolder.addReadIOErrCnt();
-                    BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                    BrokerMetricsHolder.incIOExceptionCnt();
                 }
                 if (this.segmentType == SegmentType.DATA) {
                     logger.error("[File Store] Set DATA Segment cachedSize error", e);
@@ -140,7 +140,7 @@ public class FileSegment implements Segment {
             } catch (Throwable ee) {
                 if (ee instanceof IOException) {
                     ServiceStatusHolder.addReadIOErrCnt();
-                    BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                    BrokerMetricsHolder.incIOExceptionCnt();
                 }
                 logger.error(new StringBuilder(512).append("[File Store] Close ")
                         .append(this.file.getAbsoluteFile().toString())
@@ -163,7 +163,7 @@ public class FileSegment implements Segment {
         } catch (Throwable e1) {
             if (e1 instanceof IOException) {
                 ServiceStatusHolder.addReadIOErrCnt();
-                BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incIOExceptionCnt();
             }
             logger.error("[File Store] failure to close channel ", e1);
         }
@@ -175,7 +175,7 @@ public class FileSegment implements Segment {
         } catch (Throwable ee) {
             if (ee instanceof IOException) {
                 ServiceStatusHolder.addReadIOErrCnt();
-                BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incIOExceptionCnt();
             }
             logger.error("[File Store] failure to delete file ", ee);
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 2b116dd..33495b4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -197,7 +197,7 @@ public class MsgFileStore implements Closeable {
             // print abnormal information
             if (inIndexOffset != indexOffset || inDataOffset != dataOffset) {
                 ServiceStatusHolder.addWriteIOErrCnt();
-                BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incIOExceptionCnt();
                 logger.error(sb.append("[File Store]: appendMsg data Error, storekey=")
                     .append(this.storeKey).append(",msgCnt=").append(msgCnt)
                     .append(",indexSize=").append(indexSize)
@@ -211,7 +211,7 @@ public class MsgFileStore implements Closeable {
         } catch (Throwable e) {
             if (!closed.get()) {
                 ServiceStatusHolder.addWriteIOErrCnt();
-                BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incIOExceptionCnt();
             }
             samplePrintCtrl.printExceptionCaught(e);
         } finally {
@@ -327,7 +327,7 @@ public class MsgFileStore implements Closeable {
             } catch (Throwable e2) {
                 if (e2 instanceof IOException) {
                     ServiceStatusHolder.addReadIOErrCnt();
-                    BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+                    BrokerMetricsHolder.incIOExceptionCnt();
                 }
                 samplePrintCtrl.printExceptionCaught(e2,
                     messageStore.getStoreKey(), String.valueOf(partitionId));
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index f37dfa2..e662112 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -272,12 +272,10 @@ public class MsgMemStore implements Closeable {
         final ByteBuffer tmpDataReadBuf = this.cacheDataSegment.asReadOnlyBuffer();
         tmpIndexBuffer.flip();
         tmpDataReadBuf.flip();
-        long tmpValue = System.currentTimeMillis();
+        long startTime = System.currentTimeMillis();
         msgFileStore.batchAppendMsg(strBuffer, curMessageCount.get(),
             cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(), tmpDataReadBuf);
-        long dltTime = System.currentTimeMillis() - tmpValue;
-        BrokerMetricsHolder.METRICS.syncDataDurMin.update(dltTime);
-        BrokerMetricsHolder.METRICS.syncDataDurMax.update(dltTime);
+        BrokerMetricsHolder.updSyncDataDurations(System.currentTimeMillis() - startTime);
         return true;
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index a4f1b7a..1e74bb7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -614,7 +614,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     private void commitCfmOffsets(boolean retryable) {
-        long tmpValue = System.currentTimeMillis();
+        long startTime = System.currentTimeMillis();
         for (Map.Entry<String, ConcurrentHashMap<String, OffsetStorageInfo>> entry : cfmOffsetMap.entrySet()) {
             if (TStringUtils.isBlank(entry.getKey())
                     || entry.getValue() == null || entry.getValue().isEmpty()) {
@@ -622,9 +622,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
             }
             zkOffsetStorage.commitOffset(entry.getKey(), entry.getValue().values(), retryable);
         }
-        long dltTime = System.currentTimeMillis() - tmpValue;
-        BrokerMetricsHolder.METRICS.syncZkDurMin.update(dltTime);
-        BrokerMetricsHolder.METRICS.syncZkDurMax.update(dltTime);
+        BrokerMetricsHolder.updSyncZKDurations(System.currentTimeMillis() - startTime);
     }
 
     /***
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 30eb1d1..7d23ba1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -79,7 +79,7 @@ public class ZkOffsetStorage implements OffsetStorage {
         try {
             this.zkw = new ZooKeeperWatcher(zkConfig);
         } catch (Throwable e) {
-            BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+            BrokerMetricsHolder.incZKExceptionCnt();
             logger.error(new StringBuilder(256)
                     .append("[ZkOffsetStorage] Failed to connect ZooKeeper server (")
                     .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
@@ -143,7 +143,7 @@ public class ZkOffsetStorage implements OffsetStorage {
         try {
             offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode);
         } catch (KeeperException e) {
-            BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+            BrokerMetricsHolder.incZKExceptionCnt();
             logger.error("KeeperException during load offsets from ZooKeeper", e);
             return null;
         }
@@ -183,7 +183,7 @@ public class ZkOffsetStorage implements OffsetStorage {
             try {
                 ZKUtil.updatePersistentPath(this.zkw, offsetPath, offsetData);
             } catch (final Throwable t) {
-                BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incZKExceptionCnt();
                 logger.error("Exception during commit offsets to ZooKeeper", t);
                 throw new OffsetStoreException(t);
             }
@@ -224,7 +224,7 @@ public class ZkOffsetStorage implements OffsetStorage {
                     offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1]));
                 }
             } catch (Throwable e) {
-                BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+                BrokerMetricsHolder.incZKExceptionCnt();
                 offsetMap.put(partitionId, null);
             }
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 189518b..8a08f2b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -110,7 +110,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Br
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -164,7 +164,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
     private Sleeper stopSleeper = new Sleeper(1000, this);
     private SimpleVisitTokenManager visitTokenManager;
-    private final MasterMetric masterMetrics;
 
     /**
      * constructor
@@ -179,7 +178,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.checkAndCreateBdbDataPath();
         this.masterAddInfo =
                 new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
-        this.masterMetrics = MasterMetric.create();
+        // register metric bean
+        MasterMetricsHolder.registerMXBean();
         this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
@@ -189,9 +189,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 false, TBaseConstants.META_VALUE_UNDEFINED);
         this.producerHolder = new ProducerInfoHolder();
         this.consumerHolder = new ConsumerInfoHolder(this);
-        this.consumerEventManager = new ConsumerEventManager(consumerHolder, masterMetrics);
+        this.consumerEventManager = new ConsumerEventManager(consumerHolder);
         this.topicPSInfoManager = new TopicPSInfoManager(this);
-        this.loadBalancer = new DefaultLoadBalancer(masterMetrics);
+        this.loadBalancer = new DefaultLoadBalancer();
         heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
                 new TimeoutListener() {
                     @Override
@@ -283,10 +283,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return brokerRunManager;
     }
 
-    public MasterMetric getMasterMetrics() {
-        return masterMetrics;
-    }
-
     /**
      * Producer register request to master
      *
@@ -350,10 +346,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
         heartbeatManager.regProducerNode(producerId);
-        if (producerHolder.setProducerInfo(producerId,
-                new HashSet<>(transTopicSet), hostName, overtls)) {
-            masterMetrics.producerCnt.incrementAndGet();
-        }
+        producerHolder.setProducerInfo(producerId,
+                new HashSet<>(transTopicSet), hostName, overtls);
         Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
                 brokerRunManager.getBrokerStaticInfo(overtls);
         builder.setBrokerCheckSum(brokerStaticInfo.getF0());
@@ -1697,11 +1691,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
                 if (subGroups.isEmpty()) {
                     if (curSvrBalanceParal.decrementAndGet() == 0) {
-                        long durTime = System.currentTimeMillis() - startBalanceTime;
-                        masterMetrics.svrBalLatency.set(durTime);
-                        if (durTime > masterMetrics.svrBalLatencyMax.get()) {
-                            masterMetrics.svrBalLatencyMax.set(durTime);
-                        }
+                        MasterMetricsHolder.updSvrBalanceDurations(
+                                System.currentTimeMillis() - startBalanceTime);
                     }
                     continue;
                 }
@@ -1742,11 +1733,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             logger.warn("[Svr-Balance processor] Error during process", e);
                         } finally {
                             if (curSvrBalanceParal.decrementAndGet() == 0) {
-                                long durTime = System.currentTimeMillis() - startBalanceTime;
-                                masterMetrics.svrBalLatency.set(durTime);
-                                if (durTime > masterMetrics.svrBalLatencyMax.get()) {
-                                    masterMetrics.svrBalLatencyMax.set(durTime);
-                                }
+                                MasterMetricsHolder.updSvrBalanceDurations(
+                                        System.currentTimeMillis() - startBalanceTime);
                             }
                         }
                     }
@@ -2570,24 +2558,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             try {
                 lid = masterRowLock.getLock(null,
                         StringUtils.getBytesUtf8(consumerId), true);
-                ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
+                ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId, isTimeout);
                 currentSubInfo.remove(consumerId);
                 consumerEventManager.removeAll(consumerId);
                 if (info != null) {
                     if (consumerHolder.isConsumeGroupEmpty(group)) {
                         topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
-                        // metric
-                        masterMetrics.consumeGroupCnt.decrementAndGet();
-                        if (info.getConsumeType() == ConsumeType.CONSUME_CLIENT_REB) {
-                            masterMetrics.cltBalConsumeGroupCnt.decrementAndGet();
-                        }
-                        if (isTimeout) {
-                            masterMetrics.consumeGroupTmoTotCnt.incrementAndGet();
-                        }
-                    }
-                    masterMetrics.consumerCnt.decrementAndGet();
-                    if (isTimeout) {
-                        masterMetrics.consumerTmoTotCnt.incrementAndGet();
                     }
                 }
             } catch (IOException e) {
@@ -2604,13 +2580,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         @Override
         void run(String clientId, boolean isTimeout) {
             if (clientId != null) {
-                ProducerInfo info = producerHolder.removeProducer(clientId);
+                ProducerInfo info = producerHolder.removeProducer(clientId, isTimeout);
                 if (info != null) {
                     topicPSInfoManager.rmvProducerTopicPubInfo(clientId, info.getTopicSet());
-                    masterMetrics.producerCnt.decrementAndGet();
-                    if (isTimeout) {
-                        masterMetrics.producerTmoTotCnt.incrementAndGet();
-                    }
                 }
             }
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index 501f45e..93568ed 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -34,7 +34,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
@@ -48,10 +48,9 @@ import org.slf4j.LoggerFactory;
 public class DefaultLoadBalancer implements LoadBalancer {
     private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
     private static final Random RANDOM = new Random(System.currentTimeMillis());
-    private final MasterMetric masterMetrics;
 
-    public DefaultLoadBalancer(MasterMetric masterMetrics) {
-        this.masterMetrics = masterMetrics;
+    public DefaultLoadBalancer() {
+        // initial information
     }
 
     /**
@@ -663,13 +662,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 }
             }
             if (consumeGroupInfo.addAllocatedTimes() > 0) {
-                long durTime = System.currentTimeMillis() - consumeGroupInfo.getCreateTime();
-                if (durTime < masterMetrics.svrBalResetDurMin.get()) {
-                    masterMetrics.svrBalResetDurMin.set(durTime);
-                }
-                if (durTime > masterMetrics.svrBalResetDurMax.get()) {
-                    masterMetrics.svrBalResetDurMax.set(durTime);
-                }
+                MasterMetricsHolder.updSvrBalResetDurations(
+                        System.currentTimeMillis() - consumeGroupInfo.getCreateTime());
             }
         }
         return finalSubInfoMap;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
deleted file mode 100644
index 20f93ef..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.metrics;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.commons.config.metrics.CountMetric;
-import org.apache.inlong.commons.config.metrics.Dimension;
-import org.apache.inlong.commons.config.metrics.GaugeMetric;
-import org.apache.inlong.commons.config.metrics.MetricDomain;
-import org.apache.inlong.commons.config.metrics.MetricItem;
-import org.apache.inlong.commons.config.metrics.MetricRegister;
-
-@MetricDomain(name = "master_metrics")
-public class MasterMetric extends MetricItem {
-
-    private static final MasterMetric MASTER_METRICS = new MasterMetric();
-    private static final AtomicBoolean REGISTER_ONCE =
-            new AtomicBoolean(false);
-    private static final String METRIC_NAME = "master_metrics";
-
-    @Dimension
-    public String tagName;
-
-    @GaugeMetric
-    public AtomicLong consumeGroupCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong cltBalConsumeGroupCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong consumeGroupTmoTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong consumerCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong consumerTmoTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong producerCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong producerTmoTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong brokerConfigCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong brokerOnlineCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong brokerAbnTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong brokerAbnCurCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong brokerFbdTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong brokerFbdCurCnt = new AtomicLong(0);
-
-    @CountMetric
-    public AtomicLong brokerTmoTotCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong svrBalLatency = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong svrBalLatencyMax = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong svrBalResetDurMin = new AtomicLong(Long.MAX_VALUE);
-
-    @GaugeMetric
-    public AtomicLong svrBalResetDurMax = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong svrBalConEventConsumerCnt = new AtomicLong(0);
-
-    @GaugeMetric
-    public AtomicLong svrBalDisConEventConsumerCnt = new AtomicLong(0);
-
-    public static MasterMetric create() {
-        if (REGISTER_ONCE.compareAndSet(false, true)) {
-            MASTER_METRICS.tagName = METRIC_NAME;
-            MetricRegister.register(MASTER_METRICS);
-        }
-        return MASTER_METRICS;
-    }
-}
-
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
similarity index 67%
copy from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
copy to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
index f8f0c38..fa98f2d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.tubemq.server.broker.metrics;
+package org.apache.inlong.tubemq.server.master.metrics;
 
 import org.apache.inlong.tubemq.corebase.metric.MetricValues;
 
 /**
- * BrokerMonitorMXBean
- * Provide access interface of a metric item with JMX.<br>
- * Decouple between metric item and monitor system, in particular scene, <br>
- * inlong can depend on user-defined monitor system.
+ * MasterMetricMXBean
+ * Master's metric data access interface, including:
+ * the getMetric() that  directly obtains data
+ * the getAndReSetMetrics() that can clear the values of
+ *     the counter, maximum and minimum extremum Gauge data
  */
-public interface BrokerMetricMXBean {
+public interface MasterMetricMXBean {
 
+    // get current metric data by viewing mode
     MetricValues getMetrics();
 
+    // get current metric data and reset the Counter, maximum/minimum Gauge metric
     MetricValues getAndReSetMetrics();
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java
new file mode 100644
index 0000000..0c0f2d2
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.AbsMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.CountMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeMaxMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeMinMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeNormMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.MetricValues;
+import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+
+public class MasterMetrics implements MasterMetricMXBean {
+
+    // statistics time since last reset
+    private final AtomicLong lastResetTime =
+            new AtomicLong(System.currentTimeMillis());
+    // consume group statistics
+    protected final AbsMetricItem consumeGroupCnt =
+            new GaugeNormMetricItem("consume_group_cnt");
+    protected final AbsMetricItem consumeGroupTmoTotCnt =
+            new CountMetricItem("consume_group_timeout_cnt");
+    protected final AbsMetricItem cltBalConsumeGroupCnt =
+            new GaugeNormMetricItem("client_balance_group_cnt");
+    protected final AbsMetricItem cltBalGroupTmototCnt =
+            new CountMetricItem("clt_balance_timeout_cnt");
+    // consumer client statistics
+    protected final AbsMetricItem consumerOnlineCnt =
+            new GaugeNormMetricItem("consumer_online_cnt");
+    protected final AbsMetricItem consumerTmoTotCnt =
+            new CountMetricItem("consumer_timeout_cnt");
+    // producer client statistics
+    protected final AbsMetricItem producerOnlineCnt =
+            new GaugeNormMetricItem("producer_online_cnt");
+    protected final AbsMetricItem producerTmoTotCnt =
+            new CountMetricItem("producer_timeout_cnt");
+    // broker node statistics
+    protected final AbsMetricItem brokerConfigCnt =
+            new GaugeNormMetricItem("broker_configure_cnt");
+    protected final AbsMetricItem brokerOnlineCnt =
+            new GaugeNormMetricItem("broker_online_cnt");
+    protected final AbsMetricItem brokerTmoTotCnt =
+            new CountMetricItem("broker_timeout_cnt");
+    protected final AbsMetricItem brokerAbnCurCnt =
+            new GaugeNormMetricItem("broker_abn_current_cnt");
+    protected final AbsMetricItem brokerAbnTotCnt =
+            new CountMetricItem("broker_abn_total_cnt");
+    protected final AbsMetricItem brokerFbdCurCnt =
+            new GaugeNormMetricItem("broker_fbd_current_cnt");
+    protected final AbsMetricItem brokerFbdTotCnt =
+            new CountMetricItem("broker_fbd_total_cnt");
+    // server balance statistics
+    protected final AbsMetricItem svrBalDuration =
+            new GaugeNormMetricItem("svrbalance_duration");
+    protected final AbsMetricItem svrBalDurationMin =
+            new GaugeMinMetricItem("svrbalance_duration_min");
+    protected final AbsMetricItem svrBalDurationMax =
+            new GaugeMaxMetricItem("svrbalance_duration_max");
+    protected final AbsMetricItem svrBalResetDurMin =
+            new GaugeMinMetricItem("svrbal_reset_duration_min");
+    protected final AbsMetricItem svrBalResetDurMax =
+            new GaugeMaxMetricItem("svrbal_reset_duration_max");
+    protected final AbsMetricItem svrBalConEventConsumerCnt =
+            new GaugeNormMetricItem("svrbal_con_consumer_cnt");
+    protected final AbsMetricItem svrBalDisConEventConsumerCnt =
+            new GaugeNormMetricItem("svrbal_discon_consumer_cnt");
+
+    @Override
+    public MetricValues getMetrics() {
+        Map<String, Long> metricValues = new HashMap<>();
+        metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getValue());
+        metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getValue());
+        metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getValue());
+        metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getValue());
+        metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getValue());
+        metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getValue());
+        metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getValue());
+        metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getValue());
+        metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getValue());
+        metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getValue());
+        metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getValue());
+        metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getValue());
+        metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getValue());
+        metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getValue());
+        metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getValue());
+        metricValues.put(svrBalDuration.getName(), svrBalDuration.getValue());
+        metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getValue());
+        metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getValue());
+        metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getValue());
+        metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getValue());
+        metricValues.put(svrBalConEventConsumerCnt.getName(),
+                svrBalConEventConsumerCnt.getValue());
+        metricValues.put(svrBalDisConEventConsumerCnt.getName(),
+                svrBalDisConEventConsumerCnt.getValue());
+        return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss(
+                new Date(lastResetTime.get())), metricValues);
+    }
+
+    @Override
+    public MetricValues getAndReSetMetrics() {
+        Map<String, Long> metricValues = new HashMap<>();
+        metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getAndSet());
+        metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getAndSet());
+        metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getAndSet());
+        metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getAndSet());
+        metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getAndSet());
+        metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getAndSet());
+        metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getAndSet());
+        metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getAndSet());
+        metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getAndSet());
+        metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getAndSet());
+        metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getAndSet());
+        metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getAndSet());
+        metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getAndSet());
+        metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getAndSet());
+        metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getAndSet());
+        metricValues.put(svrBalDuration.getName(), svrBalDuration.getAndSet());
+        metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getAndSet());
+        metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getAndSet());
+        metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getAndSet());
+        metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getAndSet());
+        metricValues.put(svrBalConEventConsumerCnt.getName(),
+                svrBalConEventConsumerCnt.getAndSet());
+        metricValues.put(svrBalDisConEventConsumerCnt.getName(),
+                svrBalDisConEventConsumerCnt.getAndSet());
+        alignBrokerFbdMetrics();
+        alignBrokerAbnMetrics();
+        long befTime = lastResetTime.getAndSet(System.currentTimeMillis());
+        return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss(
+                new Date(befTime)), metricValues);
+    }
+
+    public long getLastResetTime() {
+        return lastResetTime.get();
+    }
+
+    public AbsMetricItem getConsumeGroupCnt() {
+        return consumeGroupCnt;
+    }
+
+    public AbsMetricItem getConsumeGroupTmoTotCnt() {
+        return consumeGroupTmoTotCnt;
+    }
+
+    public AbsMetricItem getCltBalConsumeGroupCnt() {
+        return cltBalConsumeGroupCnt;
+    }
+
+    public AbsMetricItem getCltBalGroupTmototCnt() {
+        return cltBalGroupTmototCnt;
+    }
+
+    public AbsMetricItem getConsumerOnlineCnt() {
+        return consumerOnlineCnt;
+    }
+
+    public AbsMetricItem getConsumerTmoTotCnt() {
+        return consumerTmoTotCnt;
+    }
+
+    public AbsMetricItem getProducerOnlineCnt() {
+        return producerOnlineCnt;
+    }
+
+    public AbsMetricItem getProducerTmoTotCnt() {
+        return producerTmoTotCnt;
+    }
+
+    public AbsMetricItem getBrokerConfigCnt() {
+        return brokerConfigCnt;
+    }
+
+    public AbsMetricItem getBrokerOnlineCnt() {
+        return brokerOnlineCnt;
+    }
+
+    public AbsMetricItem getBrokerTmoTotCnt() {
+        return brokerTmoTotCnt;
+    }
+
+    public AbsMetricItem getBrokerAbnCurCnt() {
+        return brokerAbnCurCnt;
+    }
+
+    public AbsMetricItem getBrokerAbnTotCnt() {
+        return brokerAbnTotCnt;
+    }
+
+    public AbsMetricItem getBrokerFbdCurCnt() {
+        return brokerFbdCurCnt;
+    }
+
+    public AbsMetricItem getBrokerFbdTotCnt() {
+        return brokerFbdTotCnt;
+    }
+
+    public AbsMetricItem getSvrBalDuration() {
+        return svrBalDuration;
+    }
+
+    public AbsMetricItem getSvrBalDurationMin() {
+        return svrBalDurationMin;
+    }
+
+    public AbsMetricItem getSvrBalDurationMax() {
+        return svrBalDurationMax;
+    }
+
+    public AbsMetricItem getSvrBalResetDurMin() {
+        return svrBalResetDurMin;
+    }
+
+    public AbsMetricItem getSvrBalResetDurMax() {
+        return svrBalResetDurMax;
+    }
+
+    public AbsMetricItem getSvrBalConEventConsumerCnt() {
+        return svrBalConEventConsumerCnt;
+    }
+
+    public AbsMetricItem getSvrBalDisConEventConsumerCnt() {
+        return svrBalDisConEventConsumerCnt;
+    }
+
+    private void alignBrokerFbdMetrics() {
+        // Notice: the minimum value of the brokerFbdTotCnt metric value is
+        //         the current value of brokerFbdCurCnt, so the metric value
+        //         needs to be aligned after reset
+        long curCnt = brokerFbdCurCnt.getValue();
+        long totalCnt = brokerFbdTotCnt.getValue();
+        while (curCnt > totalCnt) {
+            if (brokerFbdTotCnt.compareAndSet(totalCnt, curCnt)) {
+                break;
+            }
+            curCnt = brokerFbdCurCnt.getValue();
+            totalCnt = brokerFbdTotCnt.getValue();
+        }
+    }
+
+    private void alignBrokerAbnMetrics() {
+        // Notice: the minimum value of the brokerAbnTotCnt metric value is
+        //         the current value of brokerAbnCurCnt, so the metric value
+        //         needs to be aligned after reset
+        long curCnt = brokerAbnCurCnt.getValue();
+        long totalCnt = brokerAbnTotCnt.getValue();
+        while (curCnt > totalCnt) {
+            if (brokerAbnTotCnt.compareAndSet(totalCnt, curCnt)) {
+                break;
+            }
+            curCnt = brokerAbnCurCnt.getValue();
+            totalCnt = brokerAbnTotCnt.getValue();
+        }
+    }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java
new file mode 100644
index 0000000..416420d
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterMetricsHolder {
+    private static final Logger logger =
+            LoggerFactory.getLogger(MasterMetricsHolder.class);
+    // Registration status indicator
+    private static final AtomicBoolean registered =
+            new AtomicBoolean(false);
+    // master metrics information
+    private static final MasterMetrics statsInfo = new MasterMetrics();
+
+    public static void registerMXBean() {
+        if (!registered.compareAndSet(false, true)) {
+            return;
+        }
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            ObjectName mxBeanName =
+                    new ObjectName("org.apache.inlong.tubemq.server.master:type=MasterMetrics");
+            mbs.registerMBean(statsInfo, mxBeanName);
+        } catch (Exception ex) {
+            logger.error("Register MasterMXBean error: ", ex);
+        }
+    }
+
+    public static void incConsumerCnt(boolean isGroupEmpty, boolean isCltBal) {
+        statsInfo.consumerOnlineCnt.incrementAndGet();
+        if (isGroupEmpty) {
+            statsInfo.consumeGroupCnt.incrementAndGet();
+            if (isCltBal) {
+                statsInfo.cltBalConsumeGroupCnt.incrementAndGet();
+            }
+        }
+    }
+
+    public static void decConsumerCnt(boolean isTimeout,
+                                      boolean isGroupEmpty,
+                                      boolean isCltBal) {
+        statsInfo.consumerOnlineCnt.decrementAndGet();
+        if (isTimeout) {
+            statsInfo.consumerTmoTotCnt.incrementAndGet();
+        }
+        if (isGroupEmpty) {
+            decConsumeGroupCnt(isTimeout, isCltBal);
+        }
+    }
+
+    public static void decConsumeGroupCnt(boolean isTimeout, boolean isCltBal) {
+        statsInfo.consumeGroupCnt.decrementAndGet();
+        if (isTimeout) {
+            statsInfo.consumeGroupTmoTotCnt.incrementAndGet();
+        }
+        if (isCltBal) {
+            statsInfo.cltBalConsumeGroupCnt.decrementAndGet();
+            if (isTimeout) {
+                statsInfo.cltBalGroupTmototCnt.incrementAndGet();
+            }
+        }
+    }
+
+    public static void incProducerCnt() {
+        statsInfo.producerOnlineCnt.incrementAndGet();
+    }
+
+    public static void decProducerCnt(boolean isTimeout) {
+        statsInfo.producerOnlineCnt.decrementAndGet();
+        if (isTimeout) {
+            statsInfo.producerTmoTotCnt.incrementAndGet();
+        }
+    }
+
+    public static void incSvrBalDisConConsumerCnt() {
+        statsInfo.svrBalDisConEventConsumerCnt.incrementAndGet();
+    }
+
+    public static void decSvrBalDisConConsumerCnt() {
+        statsInfo.svrBalDisConEventConsumerCnt.decrementAndGet();
+    }
+
+    public static void incSvrBalConEventConsumerCnt() {
+        statsInfo.svrBalConEventConsumerCnt.incrementAndGet();
+    }
+
+    public static void decSvrBalConEventConsumerCnt() {
+        statsInfo.svrBalConEventConsumerCnt.decrementAndGet();
+    }
+
+    public static void incBrokerConfigCnt() {
+        statsInfo.brokerConfigCnt.incrementAndGet();
+    }
+
+    public static void decBrokerConfigCnt() {
+        statsInfo.brokerConfigCnt.decrementAndGet();
+    }
+
+    public static void incBrokerOnlineCnt() {
+        statsInfo.brokerOnlineCnt.incrementAndGet();
+    }
+
+    public static void decBrokerOnlineCnt(boolean isTimeout) {
+        statsInfo.brokerOnlineCnt.decrementAndGet();
+        if (isTimeout) {
+            statsInfo.brokerTmoTotCnt.incrementAndGet();
+        }
+    }
+
+    public static void incBrokerAbnormalCnt() {
+        statsInfo.brokerAbnCurCnt.incrementAndGet();
+        statsInfo.brokerAbnTotCnt.incrementAndGet();
+    }
+
+    public static void decBrokerAbnormalCnt() {
+        statsInfo.brokerAbnCurCnt.decrementAndGet();
+    }
+
+    public static void incBrokerForbiddenCnt() {
+        statsInfo.brokerFbdCurCnt.incrementAndGet();
+        statsInfo.brokerFbdTotCnt.incrementAndGet();
+    }
+
+    public static void decBrokerForbiddenCnt() {
+        statsInfo.brokerFbdCurCnt.decrementAndGet();
+    }
+
+    public static void updSvrBalanceDurations(long dltTime) {
+        statsInfo.svrBalDuration.update(dltTime);
+        statsInfo.svrBalDurationMin.update(dltTime);
+        statsInfo.svrBalDurationMax.update(dltTime);
+    }
+
+    public static void updSvrBalResetDurations(long dltTime) {
+        statsInfo.svrBalResetDurMin.update(dltTime);
+        statsInfo.svrBalResetDurMax.update(dltTime);
+    }
+
+    public static MasterMetrics getStatsInfo() {
+        return statsInfo;
+    }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 2987225..e22ad84 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -33,7 +33,7 @@ import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,15 +51,11 @@ public class BrokerAbnHolder {
     private final MetaDataManager metaDataManager;
     private final AtomicInteger brokerForbiddenCount =
             new AtomicInteger(0);
-    // master metrics
-    private final MasterMetric masterMetrics;
 
     public BrokerAbnHolder(final int maxAutoForbiddenCnt,
-                           final MetaDataManager metaDataManager,
-                           final MasterMetric masterMetrics) {
+                           final MetaDataManager metaDataManager) {
         this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
         this.metaDataManager = metaDataManager;
-        this.masterMetrics = masterMetrics;
     }
 
     /**
@@ -79,7 +75,7 @@ public class BrokerAbnHolder {
                 if (brokerForbiddenMap.get(brokerId) == null) {
                     brokerAbnInfo = brokerAbnormalMap.remove(brokerId);
                     if (brokerAbnInfo != null) {
-                        masterMetrics.brokerAbnCurCnt.decrementAndGet();
+                        MasterMetricsHolder.decBrokerAbnormalCnt();
                         logger.warn(sBuffer.append("[Broker AutoForbidden] broker ")
                                 .append(brokerId).append(" return to normal!").toString());
                         sBuffer.delete(0, sBuffer.length());
@@ -104,8 +100,7 @@ public class BrokerAbnHolder {
         if (brokerAbnInfo == null) {
             if (brokerAbnormalMap.putIfAbsent(brokerId,
                 new BrokerAbnInfo(brokerId, reportReadStatus, reportWriteStatus)) == null) {
-                masterMetrics.brokerAbnTotCnt.incrementAndGet();
-                masterMetrics.brokerAbnCurCnt.incrementAndGet();
+                MasterMetricsHolder.incBrokerAbnormalCnt();
                 logger.warn(sBuffer.append("[Broker AutoForbidden] broker report abnormal, ")
                         .append(brokerId).append("'s reportReadStatus=")
                         .append(reportReadStatus).append(", reportWriteStatus=")
@@ -124,8 +119,7 @@ public class BrokerAbnHolder {
                 if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
                     if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) {
                         brokerForbiddenCount.incrementAndGet();
-                        masterMetrics.brokerFbdTotCnt.incrementAndGet();
-                        masterMetrics.brokerFbdCurCnt.incrementAndGet();
+                        MasterMetricsHolder.incBrokerForbiddenCnt();
                         logger.warn(sBuffer
                                 .append("[Broker AutoForbidden] master add missing forbidden broker, ")
                                 .append(brokerId).append("'s manage status to ")
@@ -143,7 +137,7 @@ public class BrokerAbnHolder {
                         brokerForbiddenCount.decrementAndGet();
                         return;
                     }
-                    masterMetrics.brokerFbdCurCnt.incrementAndGet();
+                    MasterMetricsHolder.incBrokerForbiddenCnt();
                     logger.warn(sBuffer
                             .append("[Broker AutoForbidden] master auto forbidden broker, ")
                             .append(brokerId).append("'s manage status to ")
@@ -185,12 +179,12 @@ public class BrokerAbnHolder {
     public void removeBroker(Integer brokerId) {
         BrokerAbnInfo abnInfo = brokerAbnormalMap.remove(brokerId);
         if (abnInfo != null) {
-            masterMetrics.brokerAbnCurCnt.decrementAndGet();
+            MasterMetricsHolder.decBrokerAbnormalCnt();
         }
         BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
         if (brokerFbdInfo != null) {
             this.brokerForbiddenCount.decrementAndGet();
-            masterMetrics.brokerFbdCurCnt.decrementAndGet();
+            MasterMetricsHolder.decBrokerForbiddenCnt();
         }
     }
 
@@ -275,10 +269,10 @@ public class BrokerAbnHolder {
                 brokerFbdInfos.add(fbdInfo);
                 BrokerAbnInfo abnInfo = this.brokerAbnormalMap.remove(brokerId);
                 if (abnInfo != null) {
-                    masterMetrics.brokerAbnCurCnt.decrementAndGet();
+                    MasterMetricsHolder.decBrokerAbnormalCnt();
                 }
                 this.brokerForbiddenCount.decrementAndGet();
-                masterMetrics.brokerFbdCurCnt.decrementAndGet();
+                MasterMetricsHolder.decBrokerForbiddenCnt();
             }
         }
         if (!brokerFbdInfos.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index e06fce4..e636531 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -45,7 +45,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +76,6 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
     private final BrokerAbnHolder brokerAbnHolder;
     // broker topic configure for consumer and producer
     private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
-    // master metrics
-    private final MasterMetric masterMetrics;
 
     /**
      * Constructor by TMaster
@@ -85,13 +83,11 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
      * @param tMaster  the initial TMaster object
      */
     public DefBrokerRunManager(TMaster tMaster) {
-        this.masterMetrics = tMaster.getMasterMetrics();
         this.metaDataManager = tMaster.getDefMetaDataManager();
         this.heartbeatManager = tMaster.getHeartbeatManager();
         MasterConfig masterConfig = tMaster.getMasterConfig();
         this.brokerAbnHolder =
-                new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
-                        this.metaDataManager, this.masterMetrics);
+                new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(), this.metaDataManager);
         heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
                 new TimeoutListener() {
                     @Override
@@ -150,7 +146,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
                 || !brokerReg.equals(entity.getSimpleBrokerInfo())
                 || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
             if (brokerReg == null) {
-                masterMetrics.brokerConfigCnt.incrementAndGet();
+                MasterMetricsHolder.incBrokerConfigCnt();
             } else {
                 if (!brokerReg.equals(entity.getSimpleBrokerInfo())) {
                     this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
@@ -248,7 +244,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
                             brokerInfo.getBrokerId(), tmpRunStatusInfo);
             if (runStatusInfo == null) {
                 brokerTotalCount.incrementAndGet();
-                masterMetrics.brokerOnlineCnt.incrementAndGet();
+                MasterMetricsHolder.incBrokerOnlineCnt();
                 runStatusInfo = tmpRunStatusInfo;
             }
         } else {
@@ -479,10 +475,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
         if (runStatusInfo == null) {
             return false;
         }
-        masterMetrics.brokerOnlineCnt.decrementAndGet();
-        if (isTimeout) {
-            masterMetrics.brokerTmoTotCnt.incrementAndGet();
-        }
+        MasterMetricsHolder.decBrokerOnlineCnt(isTimeout);
         brokerTotalCount.decrementAndGet();
         brokerAbnHolder.removeBroker(brokerId);
         brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index f4fa335..1966480 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,12 +42,9 @@ public class ConsumerEventManager {
             new ConcurrentHashMap<>();
 
     private final ConsumerInfoHolder consumerHolder;
-    private final MasterMetric masterMetrics;
 
-    public ConsumerEventManager(ConsumerInfoHolder consumerHolder,
-                                MasterMetric masterMetrics) {
+    public ConsumerEventManager(ConsumerInfoHolder consumerHolder) {
         this.consumerHolder = consumerHolder;
-        this.masterMetrics = masterMetrics;
     }
 
     public boolean addDisconnectEvent(String consumerId,
@@ -59,7 +56,7 @@ public class ConsumerEventManager {
             LinkedList<ConsumerEvent> tmptList =
                     disconnectEventMap.putIfAbsent(consumerId, eventList);
             if (tmptList == null) {
-                masterMetrics.svrBalDisConEventConsumerCnt.incrementAndGet();
+                MasterMetricsHolder.incSvrBalDisConConsumerCnt();
             } else {
                 eventList = tmptList;
             }
@@ -78,7 +75,7 @@ public class ConsumerEventManager {
             LinkedList<ConsumerEvent> tmptList =
                     connectEventMap.putIfAbsent(consumerId, eventList);
             if (tmptList == null) {
-                masterMetrics.svrBalConEventConsumerCnt.incrementAndGet();
+                MasterMetricsHolder.incSvrBalConEventConsumerCnt();
             } else {
                 eventList = tmptList;
             }
@@ -138,9 +135,9 @@ public class ConsumerEventManager {
                     if (eventList.isEmpty()) {
                         currentEventMap.remove(consumerId);
                         if (selDisConnMap) {
-                            masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+                            MasterMetricsHolder.decSvrBalDisConConsumerCnt();
                         } else {
-                            masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+                            MasterMetricsHolder.decSvrBalConEventConsumerCnt();
                         }
                     }
                 }
@@ -199,11 +196,11 @@ public class ConsumerEventManager {
         LinkedList<ConsumerEvent> eventInfos =
                 disconnectEventMap.remove(consumerId);
         if (eventInfos != null) {
-            masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+            MasterMetricsHolder.decSvrBalDisConConsumerCnt();
         }
         eventInfos = connectEventMap.remove(consumerId);
         if (eventInfos != null) {
-            masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+            MasterMetricsHolder.decSvrBalConEventConsumerCnt();
         }
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 122f668..0fce762 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -30,7 +30,7 @@ import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
 import org.apache.inlong.tubemq.server.common.utils.RowLock;
 import org.apache.inlong.tubemq.server.master.MasterConfig;
 import org.apache.inlong.tubemq.server.master.TMaster;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,6 @@ public class ConsumerInfoHolder {
     private static final Logger logger =
             LoggerFactory.getLogger(ConsumerInfoHolder.class);
     private final MasterConfig masterConfig;     // master configure
-    private final MasterMetric masterMetrics;
     private final RowLock groupRowLock;    //lock
     private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
             new ConcurrentHashMap<>();
@@ -51,7 +50,6 @@ public class ConsumerInfoHolder {
             new ConcurrentHashSet<>();
 
     public ConsumerInfoHolder(TMaster tMasterr) {
-        this.masterMetrics = tMasterr.getMasterMetrics();
         this.masterConfig = tMasterr.getMasterConfig();
         this.groupRowLock = new RowLock("Group-RowLock",
                 this.masterConfig.getRowLockWaitDurMs());
@@ -355,19 +353,19 @@ public class ConsumerInfoHolder {
                 consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
                 if (consumeGroupInfo == null) {
                     consumeGroupInfo = tmpGroupInfo;
-                    masterMetrics.consumeGroupCnt.incrementAndGet();
                     if (tmpGroupInfo.isClientBalance()) {
                         clientBalanceGroupSet.add(group);
-                        masterMetrics.cltBalConsumeGroupCnt.incrementAndGet();
                     } else {
                         serverBalanceGroupSet.add(group);
                     }
+                    MasterMetricsHolder.incConsumerCnt(true,
+                            consumeGroupInfo.isClientBalance());
                 }
             }
             if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
-                Boolean isNewAdd = (Boolean) result.checkData;
-                if (isNewAdd) {
-                    masterMetrics.consumerCnt.incrementAndGet();
+                if ((Boolean) result.checkData) {
+                    MasterMetricsHolder.incConsumerCnt(false,
+                            consumeGroupInfo.isClientBalance());
                 }
                 if (!isNotAllocated) {
                     consumeGroupInfo.settAllocated();
@@ -391,12 +389,15 @@ public class ConsumerInfoHolder {
      *
      * @param group group name of consumer
      * @param consumerId consumer id
+     * @param isTimeout if timeout
      * @return ConsumerInfo
      */
-    public ConsumerInfo removeConsumer(String group, String consumerId) {
+    public ConsumerInfo removeConsumer(String group, String consumerId, boolean isTimeout) {
         if (group == null || consumerId == null) {
             return null;
         }
+        boolean isCltBal = false;
+        boolean rmvGroup = false;
         ConsumerInfo consumer = null;
         Integer lid = null;
         try {
@@ -406,11 +407,23 @@ public class ConsumerInfoHolder {
             if (consumeGroupInfo != null) {
                 consumer = consumeGroupInfo.removeConsumer(consumerId);
                 if (consumeGroupInfo.isGroupEmpty()) {
-                    groupInfoMap.remove(group);
+                    rmvGroup = (groupInfoMap.remove(group) != null);
                     if (consumeGroupInfo.isClientBalance()) {
-                        clientBalanceGroupSet.add(group);
+                        isCltBal = true;
+                        clientBalanceGroupSet.remove(group);
                     } else {
-                        serverBalanceGroupSet.add(group);
+                        serverBalanceGroupSet.remove(group);
+                    }
+                    if (rmvGroup) {
+                        if (consumer == null) {
+                            MasterMetricsHolder.decConsumeGroupCnt(isTimeout, isCltBal);
+                        } else {
+                            MasterMetricsHolder.decConsumerCnt(isTimeout, true, isCltBal);
+                        }
+                    }
+                } else {
+                    if (consumer != null) {
+                        MasterMetricsHolder.decConsumerCnt(isTimeout, false, false);
                     }
                 }
             }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 6acd0d0..22a578c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -20,6 +20,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeproducer;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 
 public class ProducerInfoHolder {
 
@@ -30,12 +31,13 @@ public class ProducerInfoHolder {
         return producerInfoMap.get(producerId);
     }
 
-    public boolean setProducerInfo(String producerId,
+    public void setProducerInfo(String producerId,
                                         Set<String> topicSet,
                                         String host, boolean overTLS) {
-        ProducerInfo oldObj = producerInfoMap.put(producerId,
-                new ProducerInfo(producerId, topicSet, host, overTLS));
-        return (oldObj == null);
+        if (producerInfoMap.put(producerId,
+                new ProducerInfo(producerId, topicSet, host, overTLS)) == null) {
+            MasterMetricsHolder.incProducerCnt();
+        }
     }
 
     public void updateProducerInfo(String producerId,
@@ -51,8 +53,12 @@ public class ProducerInfoHolder {
         }
     }
 
-    public ProducerInfo removeProducer(String producerId) {
-        return producerInfoMap.remove(producerId);
+    public ProducerInfo removeProducer(String producerId, boolean isTimeout) {
+        ProducerInfo info = producerInfoMap.remove(producerId);
+        if (info != null) {
+            MasterMetricsHolder.decProducerCnt(isTimeout);
+        }
+        return info;
     }
 
     public void clear() {
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
index be7640c..5f84d59 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.broker;
 
 import org.apache.inlong.tubemq.corebase.metric.MetricValues;
 import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetrics;
+import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetricsHolder;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -29,45 +30,199 @@ public class BrokerMetricsTest {
             LoggerFactory.getLogger(BrokerMetricsTest.class);
 
     @Test
-    public void testAgentMetrics() {
+    public void testBrokerMetrics() {
         try {
             BrokerMetrics metrics = new BrokerMetrics();
-            metrics.zkExceptionCnt.incrementAndGet();
-            metrics.consumerTmoTotCnt.incrementAndGet();
-            metrics.syncDataDurMax.update(10000);
-            metrics.syncDataDurMin.update(2000);
-            metrics.syncDataDurMax.update(20000);
-            metrics.syncDataDurMin.update(1000);
-            metrics.syncDataDurMin.update(3000);
-            metrics.syncDataDurMax.update(30000);
+            // test case 1, set data
+            metrics.getIoExceptionCnt().incrementAndGet();
+            metrics.getZkExceptionCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerTmoTotCnt().incrementAndGet();
+            metrics.getHbExceptionCnt().incrementAndGet();
+            metrics.getMasterNoNodeCnt().incrementAndGet();
+
+            metrics.getSyncDataDurMax().update(20000);
+            metrics.getSyncDataDurMax().update(10000);
+            metrics.getSyncDataDurMax().update(30000);
+            metrics.getSyncDataDurMin().update(2000);
+            metrics.getSyncDataDurMin().update(1000);
+            metrics.getSyncDataDurMin().update(3000);
+
+            metrics.getSyncZkDurMax().update(20000);
+            metrics.getSyncZkDurMax().update(1000);
+            metrics.getSyncZkDurMax().update(30000);
+
+            metrics.getSyncZkDurMin().update(2000);
+            metrics.getSyncZkDurMin().update(100);
+            metrics.getSyncZkDurMin().update(3000);
+            // get metric and compare data
             MetricValues result1 = metrics.getMetrics();
-            Assert.assertEquals(Long.valueOf(1000),
-                    result1.getMetricValues().get(metrics.syncDataDurMin.getName()));
-            Assert.assertEquals(Long.valueOf(30000),
-                    result1.getMetricValues().get(metrics.syncDataDurMax.getName()));
             Assert.assertEquals(Long.valueOf(1),
-                    result1.getMetricValues().get(metrics.zkExceptionCnt.getName()));
+                    result1.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
             Assert.assertEquals(Long.valueOf(1),
-                    result1.getMetricValues().get(metrics.consumerTmoTotCnt.getName()));
-            // get and reset value
+                    result1.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+            Assert.assertEquals(Long.valueOf(30000),
+                    result1.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(1000),
+                    result1.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(30000),
+                    result1.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(100),
+                    result1.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+            // get and reset value 2
             final MetricValues result2 = metrics.getAndReSetMetrics();
-            metrics.zkExceptionCnt.incrementAndGet();
-            metrics.zkExceptionCnt.getAndSet();
-            metrics.consumerTmoTotCnt.incrementAndGet();
-            metrics.consumerTmoTotCnt.update(10);
-            metrics.syncDataDurMax.update(20000);
-            metrics.syncDataDurMin.update(2000);
+            // update metric data to 3
+            metrics.getIoExceptionCnt().incrementAndGet();
+            metrics.getZkExceptionCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().decrementAndGet();
+            metrics.getConsumerTmoTotCnt().incrementAndGet();
+            metrics.getHbExceptionCnt().incrementAndGet();
+            metrics.getMasterNoNodeCnt().incrementAndGet();
+
+            metrics.getSyncDataDurMax().update(10);
+            metrics.getSyncDataDurMax().update(10000);
+            metrics.getSyncDataDurMax().update(20000);
+            metrics.getSyncDataDurMin().update(10);
+            metrics.getSyncDataDurMin().update(1000);
+            metrics.getSyncDataDurMin().update(5000);
+
+            metrics.getSyncZkDurMax().update(10);
+            metrics.getSyncZkDurMax().update(1000);
+            metrics.getSyncZkDurMax().update(2000);
+
+            metrics.getSyncZkDurMin().update(3000);
+            metrics.getSyncZkDurMin().update(10);
+            metrics.getSyncZkDurMin().update(6000);
+
             MetricValues result3 = metrics.getMetrics();
             Assert.assertEquals(result1.getLastResetTime(),
                     result2.getLastResetTime());
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+            Assert.assertEquals(Long.valueOf(20000),
+                    result3.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(10),
+                    result3.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
             Assert.assertEquals(Long.valueOf(2000),
-                    result3.getMetricValues().get(metrics.syncDataDurMin.getName()));
+                    result3.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(10),
+                    result3.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+        } catch (Exception ex) {
+            logger.error("error happens" + ex);
+        }
+    }
+
+    @Test
+    public void testBrokerMetricsHolder() {
+        try {
+            // case 1, set data
+            BrokerMetricsHolder.incConsumerCnt();
+            BrokerMetricsHolder.decConsumerCnt(false);
+            BrokerMetricsHolder.incConsumerCnt();
+            BrokerMetricsHolder.decConsumerCnt(true);
+            BrokerMetricsHolder.incConsumerCnt();
+
+            BrokerMetricsHolder.incZKExceptionCnt();
+            BrokerMetricsHolder.incZKExceptionCnt();
+
+            BrokerMetricsHolder.incMasterNoNodeCnt();
+            BrokerMetricsHolder.incHBExceptionCnt();
+            BrokerMetricsHolder.incIOExceptionCnt();
+            BrokerMetricsHolder.incZKExceptionCnt();
+
+            BrokerMetricsHolder.updSyncDataDurations(10000);
+            BrokerMetricsHolder.updSyncDataDurations(2000);
+            BrokerMetricsHolder.updSyncDataDurations(20000);
+
+            BrokerMetricsHolder.updSyncZKDurations(1000);
+            BrokerMetricsHolder.updSyncZKDurations(30);
+            BrokerMetricsHolder.updSyncZKDurations(30000);
+            // get data and check
+            BrokerMetrics metrics = BrokerMetricsHolder.getStatsInfo();
+            MetricValues result1 = metrics.getMetrics();
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
             Assert.assertEquals(Long.valueOf(20000),
-                    result3.getMetricValues().get(metrics.syncDataDurMax.getName()));
+                    result1.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(2000),
+                    result1.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(30000),
+                    result1.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(30),
+                    result1.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+
+            // get and reset value 2
+            final MetricValues result2 = metrics.getAndReSetMetrics();
+            BrokerMetricsHolder.incConsumerCnt();
+            BrokerMetricsHolder.incConsumerCnt();
+            BrokerMetricsHolder.incConsumerCnt();
+            BrokerMetricsHolder.decConsumerCnt(false);
+            BrokerMetricsHolder.decConsumerCnt(true);
+
+            BrokerMetricsHolder.incZKExceptionCnt();
+            BrokerMetricsHolder.incZKExceptionCnt();
+
+            BrokerMetricsHolder.updSyncDataDurations(1);
+            BrokerMetricsHolder.updSyncDataDurations(5000);
+            BrokerMetricsHolder.updSyncDataDurations(30000);
+
+            BrokerMetricsHolder.updSyncZKDurations(100);
+            BrokerMetricsHolder.updSyncZKDurations(10);
+            BrokerMetricsHolder.updSyncZKDurations(5000);
+            // get and check 3
+            MetricValues result3 = metrics.getMetrics();
+            Assert.assertEquals(result1.getLastResetTime(),
+                    result2.getLastResetTime());
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
             Assert.assertEquals(Long.valueOf(0),
-                    result3.getMetricValues().get(metrics.zkExceptionCnt.getName()));
+                    result3.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+            Assert.assertEquals(Long.valueOf(30000),
+                    result3.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(5000),
+                    result3.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
             Assert.assertEquals(Long.valueOf(10),
-                    result3.getMetricValues().get(metrics.consumerTmoTotCnt.getName()));
+                    result3.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
         } catch (Exception ex) {
             logger.error("error happens" + ex);
         }
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
index 193bebe..a318683 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
@@ -17,26 +17,471 @@
 
 package org.apache.inlong.tubemq.server.master;
 
-import org.apache.inlong.commons.config.metrics.MetricValue;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.corebase.metric.MetricValues;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetrics;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public class MasterMetricsTest {
     private static final Logger logger = LoggerFactory.getLogger(MasterMetricsTest.class);
 
     @Test
-    public void testAgentMetrics() {
+    public void testMasterMetrics() {
+        try {
+            MasterMetrics metrics = new MasterMetrics();
+            // test case 1, set data
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerTmoTotCnt().incrementAndGet();
+            metrics.getConsumerTmoTotCnt().incrementAndGet();
+            metrics.getConsumeGroupCnt().incrementAndGet();
+            metrics.getConsumeGroupCnt().incrementAndGet();
+            metrics.getConsumeGroupTmoTotCnt().incrementAndGet();
+            metrics.getCltBalConsumeGroupCnt().incrementAndGet();
+            metrics.getCltBalGroupTmototCnt().incrementAndGet();
+
+            metrics.getProducerOnlineCnt().incrementAndGet();
+            metrics.getProducerOnlineCnt().incrementAndGet();
+            metrics.getProducerTmoTotCnt().incrementAndGet();
+            metrics.getProducerTmoTotCnt().incrementAndGet();
+
+            metrics.getBrokerConfigCnt().incrementAndGet();
+            metrics.getBrokerConfigCnt().incrementAndGet();
+            metrics.getBrokerOnlineCnt().incrementAndGet();
+            metrics.getBrokerOnlineCnt().incrementAndGet();
+            metrics.getBrokerTmoTotCnt().incrementAndGet();
+
+            metrics.getBrokerAbnCurCnt().incrementAndGet();
+            metrics.getBrokerAbnCurCnt().incrementAndGet();
+            metrics.getBrokerAbnTotCnt().incrementAndGet();
+            metrics.getBrokerAbnTotCnt().incrementAndGet();
+            metrics.getBrokerFbdCurCnt().incrementAndGet();
+            metrics.getBrokerFbdCurCnt().incrementAndGet();
+            metrics.getBrokerFbdTotCnt().incrementAndGet();
+            metrics.getBrokerFbdTotCnt().incrementAndGet();
+            metrics.getBrokerFbdTotCnt().incrementAndGet();
+
+            metrics.getSvrBalDuration().update(100);
+            metrics.getSvrBalDuration().update(500);
+            metrics.getSvrBalDuration().update(300);
+
+            metrics.getSvrBalDurationMin().update(700);
+            metrics.getSvrBalDurationMin().update(200);
+            metrics.getSvrBalDurationMin().update(300);
+
+            metrics.getSvrBalDurationMax().update(700);
+            metrics.getSvrBalDurationMax().update(1000);
+            metrics.getSvrBalDurationMax().update(300);
+
+            metrics.getSvrBalResetDurMin().update(700);
+            metrics.getSvrBalResetDurMin().update(200);
+            metrics.getSvrBalResetDurMin().update(300);
+
+            metrics.getSvrBalResetDurMax().update(700);
+            metrics.getSvrBalResetDurMax().update(1000);
+            metrics.getSvrBalResetDurMax().update(300);
+
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+
+            metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+            // get metric and compare data
+            MetricValues result1 = metrics.getMetrics();
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(300),
+                    result1.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+            Assert.assertEquals(Long.valueOf(200),
+                    result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+            Assert.assertEquals(Long.valueOf(1000),
+                    result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+            Assert.assertEquals(Long.valueOf(200),
+                    result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(1000),
+                    result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(
+                            metrics.getSvrBalConEventConsumerCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(
+                            metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+            // get and reset value 2
+            final MetricValues result2 = metrics.getAndReSetMetrics();
+            // update metric data to 3
+            metrics.getConsumerOnlineCnt().incrementAndGet();
+            metrics.getConsumerOnlineCnt().decrementAndGet();
+            metrics.getConsumeGroupCnt().incrementAndGet();
+            metrics.getConsumeGroupTmoTotCnt().incrementAndGet();
+            metrics.getCltBalConsumeGroupCnt().incrementAndGet();
+            metrics.getCltBalGroupTmototCnt().incrementAndGet();
+
+            metrics.getProducerOnlineCnt().incrementAndGet();
+            metrics.getProducerOnlineCnt().incrementAndGet();
+
+            metrics.getBrokerConfigCnt().incrementAndGet();
+            metrics.getBrokerConfigCnt().incrementAndGet();
+            metrics.getBrokerOnlineCnt().decrementAndGet();
+            metrics.getBrokerOnlineCnt().decrementAndGet();
+            metrics.getBrokerTmoTotCnt().incrementAndGet();
+
+            metrics.getBrokerAbnCurCnt().incrementAndGet();
+            metrics.getBrokerAbnCurCnt().incrementAndGet();
+            metrics.getBrokerFbdCurCnt().incrementAndGet();
+            metrics.getBrokerFbdCurCnt().incrementAndGet();
+            metrics.getBrokerFbdTotCnt().incrementAndGet();
+
+            metrics.getSvrBalDuration().update(100);
+            metrics.getSvrBalDuration().update(700);
+            metrics.getSvrBalDuration().update(20);
+
+            metrics.getSvrBalDurationMin().update(1000);
+            metrics.getSvrBalDurationMin().update(50);
+            metrics.getSvrBalDurationMin().update(3000);
+
+            metrics.getSvrBalDurationMax().update(700);
+            metrics.getSvrBalDurationMax().update(800);
+            metrics.getSvrBalDurationMax().update(300);
+
+            metrics.getSvrBalResetDurMin().update(700);
+            metrics.getSvrBalResetDurMin().update(10);
+            metrics.getSvrBalResetDurMin().update(300);
+
+            metrics.getSvrBalResetDurMax().update(700);
+            metrics.getSvrBalResetDurMax().update(2000);
+            metrics.getSvrBalResetDurMax().update(300);
+
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+
+            metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+            metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+
+            // get metric and compare data
+            MetricValues result3 = metrics.getMetrics();
+            Assert.assertEquals(result1.getLastResetTime(),
+                    result2.getLastResetTime());
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(4),
+                    result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(4),
+                    result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(4),
+                    result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(4),
+                    result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(20),
+                    result3.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+            Assert.assertEquals(Long.valueOf(50),
+                    result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+            Assert.assertEquals(Long.valueOf(800),
+                    result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+            Assert.assertEquals(Long.valueOf(10),
+                    result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(2000),
+                    result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+            Assert.assertEquals(Long.valueOf(6),
+                    result3.getMetricValues().get(
+                            metrics.getSvrBalConEventConsumerCnt().getName()));
+            Assert.assertEquals(Long.valueOf(5),
+                    result3.getMetricValues().get(
+                            metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+        } catch (Exception ex) {
+            logger.error("error happens" + ex);
+        }
+    }
+
+    @Test
+    public void testMasterMetricsHolder() {
         try {
-            MasterMetric taskMetrics = MasterMetric.create();
-            taskMetrics.svrBalLatency.incrementAndGet();
-            Map<String, MetricValue> result = taskMetrics.snapshot();
-            Assert.assertEquals(1, taskMetrics.svrBalLatency.get());
+            // test case 1, set data
+            // add 12 consumer, 8 group, 4 client balance
+            MasterMetricsHolder.incConsumerCnt(false, false);
+            MasterMetricsHolder.incConsumerCnt(false, true);
+            MasterMetricsHolder.incConsumerCnt(false, false);
+            MasterMetricsHolder.incConsumerCnt(false, true);
+            MasterMetricsHolder.incConsumerCnt(true, false);
+            MasterMetricsHolder.incConsumerCnt(true, false);
+            MasterMetricsHolder.incConsumerCnt(true, true);
+            MasterMetricsHolder.incConsumerCnt(true, true);
+            MasterMetricsHolder.incConsumerCnt(true, false);
+            MasterMetricsHolder.incConsumerCnt(true, false);
+            MasterMetricsHolder.incConsumerCnt(true, true);
+            MasterMetricsHolder.incConsumerCnt(true, true);
+            // dec 8 consumer, add 4 timeout consumer,
+            // dec 4 group, add 2 timeout group, dec 2 client balance group
+            MasterMetricsHolder.decConsumerCnt(false, false, false);
+            MasterMetricsHolder.decConsumerCnt(false, false, true);
+            MasterMetricsHolder.decConsumerCnt(false, true, false);
+            MasterMetricsHolder.decConsumerCnt(false, true, true);
+            MasterMetricsHolder.decConsumerCnt(true, false, false);
+            MasterMetricsHolder.decConsumerCnt(true, false, true);
+            MasterMetricsHolder.decConsumerCnt(true, true, false);
+            MasterMetricsHolder.decConsumerCnt(true, true, true);
+            // dec 4 group, add 2 timeout group, dec 2 client balance group
+            MasterMetricsHolder.decConsumeGroupCnt(false, false);
+            MasterMetricsHolder.decConsumeGroupCnt(false, true);
+            MasterMetricsHolder.decConsumeGroupCnt(true, false);
+            MasterMetricsHolder.decConsumeGroupCnt(true, true);
+            // add 3 producer
+            // dec 3 producer, 2 timeout producer
+            MasterMetricsHolder.incProducerCnt();
+            MasterMetricsHolder.incProducerCnt();
+            MasterMetricsHolder.incProducerCnt();
+            MasterMetricsHolder.decProducerCnt(false);
+            MasterMetricsHolder.decProducerCnt(true);
+            MasterMetricsHolder.decProducerCnt(true);
+            // add 3 disconcnt
+            // dec 2 disconcnt
+            MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+            MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+            MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+            MasterMetricsHolder.decSvrBalDisConConsumerCnt();
+            MasterMetricsHolder.decSvrBalDisConConsumerCnt();
+            // add 3 concnt
+            // dec 2 concnt
+            MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+            MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+            MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+            MasterMetricsHolder.decSvrBalConEventConsumerCnt();
+            MasterMetricsHolder.decSvrBalConEventConsumerCnt();
+            // add 3 broker configure count
+            // dec 2 broker configure count
+            MasterMetricsHolder.incBrokerConfigCnt();
+            MasterMetricsHolder.incBrokerConfigCnt();
+            MasterMetricsHolder.incBrokerConfigCnt();
+            MasterMetricsHolder.decBrokerConfigCnt();
+            MasterMetricsHolder.decBrokerConfigCnt();
+            // add 3 broker online count
+            // dec 2 broker online count, 1 timeout count
+            MasterMetricsHolder.incBrokerOnlineCnt();
+            MasterMetricsHolder.incBrokerOnlineCnt();
+            MasterMetricsHolder.incBrokerOnlineCnt();
+            MasterMetricsHolder.decBrokerOnlineCnt(false);
+            MasterMetricsHolder.decBrokerOnlineCnt(true);
+            // add 3 broker abnormal count, 3 total abnormal count
+            // dec 1 broker abnormal count
+            MasterMetricsHolder.incBrokerAbnormalCnt();
+            MasterMetricsHolder.decBrokerAbnormalCnt();
+            MasterMetricsHolder.incBrokerAbnormalCnt();
+            MasterMetricsHolder.incBrokerAbnormalCnt();
+            // add 4 broker forbidden count, 4 total forbidden count
+            // dec 1 broker forbidden count
+            MasterMetricsHolder.incBrokerForbiddenCnt();
+            MasterMetricsHolder.decBrokerForbiddenCnt();
+            MasterMetricsHolder.incBrokerForbiddenCnt();
+            MasterMetricsHolder.incBrokerForbiddenCnt();
+            MasterMetricsHolder.incBrokerForbiddenCnt();
+            // max: 1000, min 100
+            MasterMetricsHolder.updSvrBalanceDurations(300);
+            MasterMetricsHolder.updSvrBalanceDurations(500);
+            MasterMetricsHolder.updSvrBalanceDurations(100);
+            MasterMetricsHolder.updSvrBalanceDurations(1000);
+            // max: 5000, min 500
+            MasterMetricsHolder.updSvrBalResetDurations(3000);
+            MasterMetricsHolder.updSvrBalResetDurations(500);
+            MasterMetricsHolder.updSvrBalResetDurations(1000);
+            MasterMetricsHolder.updSvrBalResetDurations(5000);
+            // get metric and compare data
+            MasterMetrics metrics = MasterMetricsHolder.getStatsInfo();
+            MetricValues result1 = metrics.getMetrics();
+            Assert.assertEquals(Long.valueOf(4),
+                    result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(4),
+                    result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(0),
+                    result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(2),
+                    result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(4),
+                    result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(1000),
+                    result1.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+            Assert.assertEquals(Long.valueOf(100),
+                    result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+            Assert.assertEquals(Long.valueOf(1000),
+                    result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+            Assert.assertEquals(Long.valueOf(500),
+                    result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(5000),
+                    result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(
+                            metrics.getSvrBalConEventConsumerCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result1.getMetricValues().get(
+                            metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+            // get and reset value 2
+            final MetricValues result2 = metrics.getAndReSetMetrics();
+            // update metric data to 3
+            // test case 3, set data
+            // add 3 consumer, 2 group, 1 client balance
+            MasterMetricsHolder.incConsumerCnt(false, false);
+            MasterMetricsHolder.incConsumerCnt(true, false);
+            MasterMetricsHolder.incConsumerCnt(true, true);
+            // dec 2 consumer, add 1 timeout consumer,
+            // dec 1 group, add 1 timeout group
+            MasterMetricsHolder.decConsumerCnt(true, true, true);
+            MasterMetricsHolder.decConsumerCnt(false, false, true);
+            // dec 1 group, add 1 timeout group
+            MasterMetricsHolder.decConsumeGroupCnt(true, false);
+            // add 2 producer
+            // dec 1 producer
+            MasterMetricsHolder.incProducerCnt();
+            MasterMetricsHolder.incProducerCnt();
+            MasterMetricsHolder.decProducerCnt(false);
+            // add 1 abnormal ,dec 1 abnormal
+            MasterMetricsHolder.incBrokerAbnormalCnt();
+            MasterMetricsHolder.decBrokerAbnormalCnt();
+
+            // max: 1000, min 100
+            MasterMetricsHolder.updSvrBalanceDurations(5000);
+            MasterMetricsHolder.updSvrBalanceDurations(500);
+            MasterMetricsHolder.updSvrBalanceDurations(100);
+            MasterMetricsHolder.updSvrBalanceDurations(8000);
+            // max: 5000, min 500
+            MasterMetricsHolder.updSvrBalResetDurations(2000);
+            MasterMetricsHolder.updSvrBalResetDurations(100);
+            MasterMetricsHolder.updSvrBalResetDurations(1000);
+            MasterMetricsHolder.updSvrBalResetDurations(4000);
+
+            // get metric and compare data
+            MetricValues result3 = metrics.getMetrics();
+            Assert.assertEquals(result1.getLastResetTime(),
+                    result2.getLastResetTime());
+            Assert.assertEquals(Long.valueOf(5),
+                    result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+            Assert.assertEquals(Long.valueOf(0),
+                    result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(2),
+                    result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+            Assert.assertEquals(Long.valueOf(3),
+                    result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+            Assert.assertEquals(Long.valueOf(8000),
+                    result3.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+            Assert.assertEquals(Long.valueOf(100),
+                    result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+            Assert.assertEquals(Long.valueOf(8000),
+                    result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+            Assert.assertEquals(Long.valueOf(100),
+                    result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+            Assert.assertEquals(Long.valueOf(4000),
+                    result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
 
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(
+                            metrics.getSvrBalConEventConsumerCnt().getName()));
+            Assert.assertEquals(Long.valueOf(1),
+                    result3.getMetricValues().get(
+                            metrics.getSvrBalDisConEventConsumerCnt().getName()));
         } catch (Exception ex) {
             logger.error("error happens" + ex);
         }
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
index 8d93ed0..24d3777 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
 
 import static org.mockito.Mockito.mock;
 import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -28,14 +27,11 @@ import org.junit.Test;
 public class ConsumerEventManagerTest {
     private ConsumerEventManager consumerEventManager;
     private ConsumerInfoHolder consumerInfoHolder;
-    private MasterMetric masterMetrics;
 
     @Before
     public void setUp() throws Exception {
         consumerInfoHolder = mock(ConsumerInfoHolder.class);
-        masterMetrics = MasterMetric.create();
-        consumerEventManager =
-                new ConsumerEventManager(consumerInfoHolder, masterMetrics);
+        consumerEventManager = new ConsumerEventManager(consumerInfoHolder);
     }
 
     @After