You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2021/12/02 02:42:15 UTC

[incubator-inlong] branch master updated: [INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 43363b9  [INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)
43363b9 is described below

commit 43363b95fc94762474b0cc02cf40973b747af99b
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Dec 2 10:42:07 2021 +0800

    [INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)
---
 inlong-tubemq/pom.xml                              |   5 +
 inlong-tubemq/tubemq-server/pom.xml                |   4 +
 .../inlong/tubemq/server/master/TMaster.java       |  72 +++++++++++---
 .../server/master/balance/DefaultLoadBalancer.java |  16 ++-
 .../tubemq/server/master/metrics/MasterMetric.java | 108 +++++++++++++++++++++
 .../nodemanage/nodebroker/BrokerAbnHolder.java     |  25 ++++-
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  20 +++-
 .../nodemanage/nodeconsumer/ConsumeGroupInfo.java  |   8 +-
 .../nodeconsumer/ConsumerEventManager.java         |  33 +++++--
 .../nodeconsumer/ConsumerInfoHolder.java           |  14 ++-
 .../nodeproducer/ProducerInfoHolder.java           |   9 +-
 .../tubemq/server/master/MasterMetricsTest.java    |  44 +++++++++
 .../nodeconsumer/ConsumerEventManagerTest.java     |   6 +-
 13 files changed, 321 insertions(+), 43 deletions(-)

diff --git a/inlong-tubemq/pom.xml b/inlong-tubemq/pom.xml
index 6b66310..315ae70 100644
--- a/inlong-tubemq/pom.xml
+++ b/inlong-tubemq/pom.xml
@@ -187,6 +187,11 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.inlong</groupId>
+                <artifactId>inlong-common</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>1.7.30</version>
diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml
index 1a463d9..dfa48cd 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -129,6 +129,10 @@
             <artifactId>tubemq-example</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>inlong-common</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
         </dependency>
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index f323287..189518b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -110,6 +110,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Br
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -163,6 +164,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
     private Sleeper stopSleeper = new Sleeper(1000, this);
     private SimpleVisitTokenManager visitTokenManager;
+    private final MasterMetric masterMetrics;
 
     /**
      * constructor
@@ -177,6 +179,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.checkAndCreateBdbDataPath();
         this.masterAddInfo =
                 new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
+        this.masterMetrics = MasterMetric.create();
         this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
         this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
@@ -185,17 +188,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
                 false, TBaseConstants.META_VALUE_UNDEFINED);
         this.producerHolder = new ProducerInfoHolder();
-        this.consumerHolder = new ConsumerInfoHolder(this.masterConfig);
-        this.consumerEventManager = new ConsumerEventManager(consumerHolder);
+        this.consumerHolder = new ConsumerInfoHolder(this);
+        this.consumerEventManager = new ConsumerEventManager(consumerHolder, masterMetrics);
         this.topicPSInfoManager = new TopicPSInfoManager(this);
-        this.loadBalancer = new DefaultLoadBalancer();
+        this.loadBalancer = new DefaultLoadBalancer(masterMetrics);
         heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
                 new TimeoutListener() {
                     @Override
                     public void onTimeout(String nodeId, TimeoutInfo nodeInfo) {
                         logger.info(new StringBuilder(512).append("[Consumer Timeout] ")
                                 .append(nodeId).toString());
-                        new ReleaseConsumer().run(nodeId);
+                        new ReleaseConsumer().run(nodeId, true);
                     }
                 });
         heartbeatManager.regProducerCheckBusiness(masterConfig.getProducerHeartbeatTimeoutMs(),
@@ -204,7 +207,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) {
                         logger.info(new StringBuilder(512).append("[Producer Timeout] ")
                                 .append(nodeId).toString());
-                        new ReleaseProducer().run(nodeId);
+                        new ReleaseProducer().run(nodeId, true);
                     }
                 });
         this.defMetaDataManager = new MetaDataManager(this);
@@ -280,6 +283,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return brokerRunManager;
     }
 
+    public MasterMetric getMasterMetrics() {
+        return masterMetrics;
+    }
+
     /**
      * Producer register request to master
      *
@@ -343,8 +350,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
         heartbeatManager.regProducerNode(producerId);
-        producerHolder.setProducerInfo(producerId,
-                new HashSet<>(transTopicSet), hostName, overtls);
+        if (producerHolder.setProducerInfo(producerId,
+                new HashSet<>(transTopicSet), hostName, overtls)) {
+            masterMetrics.producerCnt.incrementAndGet();
+        }
         Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
                 brokerRunManager.getBrokerStaticInfo(overtls);
         builder.setBrokerCheckSum(brokerStaticInfo.getF0());
@@ -493,7 +502,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         final String producerId = (String) paramCheckResult.checkData;
         checkNodeStatus(producerId, strBuffer);
-        new ReleaseProducer().run(producerId);
+        new ReleaseProducer().run(producerId, false);
         heartbeatManager.unRegProducerNode(producerId);
         logger.info(strBuffer.append("[Producer Closed] ")
                 .append(producerId).append(", isOverTLS=").append(overtls).toString());
@@ -913,7 +922,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         String nodeId = getConsumerKey(groupName, clientId);
         logger.info(strBuffer.append("[Consumer Closed]").append(nodeId)
                 .append(", isOverTLS=").append(overtls).toString());
-        new ReleaseConsumer().run(nodeId);
+        new ReleaseConsumer().run(nodeId, false);
         heartbeatManager.unRegConsumerNode(nodeId);
         builder.setSuccess(true);
         builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1679,6 +1688,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             int startIndex = 0;
             int endIndex = 0;
             // set parallel balance signal
+            final long startBalanceTime = System.currentTimeMillis();
             curSvrBalanceParal.set(masterConfig.getRebalanceParallel());
             for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
                 // get groups need to balance
@@ -1686,7 +1696,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
                 final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
                 if (subGroups.isEmpty()) {
-                    curSvrBalanceParal.decrementAndGet();
+                    if (curSvrBalanceParal.decrementAndGet() == 0) {
+                        long durTime = System.currentTimeMillis() - startBalanceTime;
+                        masterMetrics.svrBalLatency.set(durTime);
+                        if (durTime > masterMetrics.svrBalLatencyMax.get()) {
+                            masterMetrics.svrBalLatencyMax.set(durTime);
+                        }
+                    }
                     continue;
                 }
                 // execute balance
@@ -1725,7 +1741,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         } catch (Throwable e) {
                             logger.warn("[Svr-Balance processor] Error during process", e);
                         } finally {
-                            curSvrBalanceParal.decrementAndGet();
+                            if (curSvrBalanceParal.decrementAndGet() == 0) {
+                                long durTime = System.currentTimeMillis() - startBalanceTime;
+                                masterMetrics.svrBalLatency.set(durTime);
+                                if (durTime > masterMetrics.svrBalLatencyMax.get()) {
+                                    masterMetrics.svrBalLatencyMax.set(durTime);
+                                }
+                            }
                         }
                     }
                 });
@@ -2535,12 +2557,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     }
 
     private abstract static class AbstractReleaseRunner {
-        abstract void run(String arg);
+        abstract void run(String arg, boolean isTimeout);
     }
 
     private class ReleaseConsumer extends AbstractReleaseRunner {
         @Override
-        void run(String arg) {
+        void run(String arg, boolean isTimeout) {
             String[] nodeStrs = arg.split(TokenConstants.GROUP_SEP);
             String consumerId = nodeStrs[0];
             String group = nodeStrs[1];
@@ -2551,8 +2573,22 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
                 currentSubInfo.remove(consumerId);
                 consumerEventManager.removeAll(consumerId);
-                if (info != null && consumerHolder.isConsumeGroupEmpty(group)) {
-                    topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
+                if (info != null) {
+                    if (consumerHolder.isConsumeGroupEmpty(group)) {
+                        topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
+                        // metric
+                        masterMetrics.consumeGroupCnt.decrementAndGet();
+                        if (info.getConsumeType() == ConsumeType.CONSUME_CLIENT_REB) {
+                            masterMetrics.cltBalConsumeGroupCnt.decrementAndGet();
+                        }
+                        if (isTimeout) {
+                            masterMetrics.consumeGroupTmoTotCnt.incrementAndGet();
+                        }
+                    }
+                    masterMetrics.consumerCnt.decrementAndGet();
+                    if (isTimeout) {
+                        masterMetrics.consumerTmoTotCnt.incrementAndGet();
+                    }
                 }
             } catch (IOException e) {
                 logger.warn("Failed to lock.", e);
@@ -2566,11 +2602,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
 
     private class ReleaseProducer extends AbstractReleaseRunner {
         @Override
-        void run(String clientId) {
+        void run(String clientId, boolean isTimeout) {
             if (clientId != null) {
                 ProducerInfo info = producerHolder.removeProducer(clientId);
                 if (info != null) {
                     topicPSInfoManager.rmvProducerTopicPubInfo(clientId, info.getTopicSet());
+                    masterMetrics.producerCnt.decrementAndGet();
+                    if (isTimeout) {
+                        masterMetrics.producerTmoTotCnt.incrementAndGet();
+                    }
                 }
             }
         }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index 39e4830..501f45e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -34,6 +34,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
@@ -47,9 +48,10 @@ import org.slf4j.LoggerFactory;
 public class DefaultLoadBalancer implements LoadBalancer {
     private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
     private static final Random RANDOM = new Random(System.currentTimeMillis());
+    private final MasterMetric masterMetrics;
 
-    public DefaultLoadBalancer() {
-
+    public DefaultLoadBalancer(MasterMetric masterMetrics) {
+        this.masterMetrics = masterMetrics;
     }
 
     /**
@@ -660,7 +662,15 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     partitionMap.remove(entry.getKey());
                 }
             }
-            consumeGroupInfo.addAllocatedTimes();
+            if (consumeGroupInfo.addAllocatedTimes() > 0) {
+                long durTime = System.currentTimeMillis() - consumeGroupInfo.getCreateTime();
+                if (durTime < masterMetrics.svrBalResetDurMin.get()) {
+                    masterMetrics.svrBalResetDurMin.set(durTime);
+                }
+                if (durTime > masterMetrics.svrBalResetDurMax.get()) {
+                    masterMetrics.svrBalResetDurMax.set(durTime);
+                }
+            }
         }
         return finalSubInfoMap;
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
new file mode 100644
index 0000000..20f93ef
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.GaugeMetric;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+
+@MetricDomain(name = "master_metrics")
+public class MasterMetric extends MetricItem {
+
+    private static final MasterMetric MASTER_METRICS = new MasterMetric();
+    private static final AtomicBoolean REGISTER_ONCE =
+            new AtomicBoolean(false);
+    private static final String METRIC_NAME = "master_metrics";
+
+    @Dimension
+    public String tagName;
+
+    @GaugeMetric
+    public AtomicLong consumeGroupCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong cltBalConsumeGroupCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong consumeGroupTmoTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong consumerCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong consumerTmoTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong producerCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong producerTmoTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong brokerConfigCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong brokerOnlineCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong brokerAbnTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong brokerAbnCurCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong brokerFbdTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong brokerFbdCurCnt = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong brokerTmoTotCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong svrBalLatency = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong svrBalLatencyMax = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong svrBalResetDurMin = new AtomicLong(Long.MAX_VALUE);
+
+    @GaugeMetric
+    public AtomicLong svrBalResetDurMax = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong svrBalConEventConsumerCnt = new AtomicLong(0);
+
+    @GaugeMetric
+    public AtomicLong svrBalDisConEventConsumerCnt = new AtomicLong(0);
+
+    public static MasterMetric create() {
+        if (REGISTER_ONCE.compareAndSet(false, true)) {
+            MASTER_METRICS.tagName = METRIC_NAME;
+            MetricRegister.register(MASTER_METRICS);
+        }
+        return MASTER_METRICS;
+    }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 666f840..2987225 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -33,6 +33,7 @@ import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,11 +51,15 @@ public class BrokerAbnHolder {
     private final MetaDataManager metaDataManager;
     private final AtomicInteger brokerForbiddenCount =
             new AtomicInteger(0);
+    // master metrics
+    private final MasterMetric masterMetrics;
 
     public BrokerAbnHolder(final int maxAutoForbiddenCnt,
-                           final MetaDataManager metaDataManager) {
+                           final MetaDataManager metaDataManager,
+                           final MasterMetric masterMetrics) {
         this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
         this.metaDataManager = metaDataManager;
+        this.masterMetrics = masterMetrics;
     }
 
     /**
@@ -74,6 +79,7 @@ public class BrokerAbnHolder {
                 if (brokerForbiddenMap.get(brokerId) == null) {
                     brokerAbnInfo = brokerAbnormalMap.remove(brokerId);
                     if (brokerAbnInfo != null) {
+                        masterMetrics.brokerAbnCurCnt.decrementAndGet();
                         logger.warn(sBuffer.append("[Broker AutoForbidden] broker ")
                                 .append(brokerId).append(" return to normal!").toString());
                         sBuffer.delete(0, sBuffer.length());
@@ -98,6 +104,8 @@ public class BrokerAbnHolder {
         if (brokerAbnInfo == null) {
             if (brokerAbnormalMap.putIfAbsent(brokerId,
                 new BrokerAbnInfo(brokerId, reportReadStatus, reportWriteStatus)) == null) {
+                masterMetrics.brokerAbnTotCnt.incrementAndGet();
+                masterMetrics.brokerAbnCurCnt.incrementAndGet();
                 logger.warn(sBuffer.append("[Broker AutoForbidden] broker report abnormal, ")
                         .append(brokerId).append("'s reportReadStatus=")
                         .append(reportReadStatus).append(", reportWriteStatus=")
@@ -116,6 +124,8 @@ public class BrokerAbnHolder {
                 if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
                     if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) {
                         brokerForbiddenCount.incrementAndGet();
+                        masterMetrics.brokerFbdTotCnt.incrementAndGet();
+                        masterMetrics.brokerFbdCurCnt.incrementAndGet();
                         logger.warn(sBuffer
                                 .append("[Broker AutoForbidden] master add missing forbidden broker, ")
                                 .append(brokerId).append("'s manage status to ")
@@ -133,6 +143,7 @@ public class BrokerAbnHolder {
                         brokerForbiddenCount.decrementAndGet();
                         return;
                     }
+                    masterMetrics.brokerFbdCurCnt.incrementAndGet();
                     logger.warn(sBuffer
                             .append("[Broker AutoForbidden] master auto forbidden broker, ")
                             .append(brokerId).append("'s manage status to ")
@@ -172,10 +183,14 @@ public class BrokerAbnHolder {
      * @param brokerId the deleted broker id
      */
     public void removeBroker(Integer brokerId) {
-        brokerAbnormalMap.remove(brokerId);
+        BrokerAbnInfo abnInfo = brokerAbnormalMap.remove(brokerId);
+        if (abnInfo != null) {
+            masterMetrics.brokerAbnCurCnt.decrementAndGet();
+        }
         BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
         if (brokerFbdInfo != null) {
             this.brokerForbiddenCount.decrementAndGet();
+            masterMetrics.brokerFbdCurCnt.decrementAndGet();
         }
     }
 
@@ -258,8 +273,12 @@ public class BrokerAbnHolder {
             BrokerFbdInfo fbdInfo = this.brokerForbiddenMap.remove(brokerId);
             if (fbdInfo != null) {
                 brokerFbdInfos.add(fbdInfo);
-                this.brokerAbnormalMap.remove(brokerId);
+                BrokerAbnInfo abnInfo = this.brokerAbnormalMap.remove(brokerId);
+                if (abnInfo != null) {
+                    masterMetrics.brokerAbnCurCnt.decrementAndGet();
+                }
                 this.brokerForbiddenCount.decrementAndGet();
+                masterMetrics.brokerFbdCurCnt.decrementAndGet();
             }
         }
         if (!brokerFbdInfos.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 5775029..e06fce4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -45,6 +45,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
 import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +76,8 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
     private final BrokerAbnHolder brokerAbnHolder;
     // broker topic configure for consumer and producer
     private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+    // master metrics
+    private final MasterMetric masterMetrics;
 
     /**
      * Constructor by TMaster
@@ -82,12 +85,13 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
      * @param tMaster  the initial TMaster object
      */
     public DefBrokerRunManager(TMaster tMaster) {
+        this.masterMetrics = tMaster.getMasterMetrics();
         this.metaDataManager = tMaster.getDefMetaDataManager();
         this.heartbeatManager = tMaster.getHeartbeatManager();
         MasterConfig masterConfig = tMaster.getMasterConfig();
         this.brokerAbnHolder =
                 new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
-                        this.metaDataManager);
+                        this.metaDataManager, this.masterMetrics);
         heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
                 new TimeoutListener() {
                     @Override
@@ -145,9 +149,12 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
                 || brokerTLSReg == null
                 || !brokerReg.equals(entity.getSimpleBrokerInfo())
                 || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
-            if (brokerReg != null
-                    && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
-                this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+            if (brokerReg == null) {
+                masterMetrics.brokerConfigCnt.incrementAndGet();
+            } else {
+                if (!brokerReg.equals(entity.getSimpleBrokerInfo())) {
+                    this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+                }
             }
             if (brokerTLSReg != null
                     && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
@@ -241,6 +248,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
                             brokerInfo.getBrokerId(), tmpRunStatusInfo);
             if (runStatusInfo == null) {
                 brokerTotalCount.incrementAndGet();
+                masterMetrics.brokerOnlineCnt.incrementAndGet();
                 runStatusInfo = tmpRunStatusInfo;
             }
         } else {
@@ -471,6 +479,10 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
         if (runStatusInfo == null) {
             return false;
         }
+        masterMetrics.brokerOnlineCnt.decrementAndGet();
+        if (isTimeout) {
+            masterMetrics.brokerTmoTotCnt.incrementAndGet();
+        }
         brokerTotalCount.decrementAndGet();
         brokerAbnHolder.removeBroker(brokerId);
         brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index a13c8b2..f3d0ac3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -131,7 +131,7 @@ public class ConsumeGroupInfo {
                         consumerInfoMap.get(inConsumer.getConsumerId());
                 if (curConsumerInfo != null) {
                     curConsumerInfo.updCurConsumerInfo(inConsumer);
-                    result.setCheckData("Ok!");
+                    result.setCheckData(false);
                     return true;
                 }
             }
@@ -139,7 +139,7 @@ public class ConsumeGroupInfo {
             if (consumeType == ConsumeType.CONSUME_BAND) {
                 bookPartitionInfo(inConsumer);
             }
-            result.setCheckData("Ok!");
+            result.setCheckData(true);
             return true;
         } finally {
             csmInfoRWLock.writeLock().unlock();
@@ -349,8 +349,8 @@ public class ConsumeGroupInfo {
         return allocatedTimes.get();
     }
 
-    public void addAllocatedTimes() {
-        this.allocatedTimes.incrementAndGet();
+    public int addAllocatedTimes() {
+        return this.allocatedTimes.incrementAndGet();
     }
 
     public boolean isGroupEmpty() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index e3b0046..f4fa335 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +42,12 @@ public class ConsumerEventManager {
             new ConcurrentHashMap<>();
 
     private final ConsumerInfoHolder consumerHolder;
+    private final MasterMetric masterMetrics;
 
-    public ConsumerEventManager(ConsumerInfoHolder consumerHolder) {
+    public ConsumerEventManager(ConsumerInfoHolder consumerHolder,
+                                MasterMetric masterMetrics) {
         this.consumerHolder = consumerHolder;
+        this.masterMetrics = masterMetrics;
     }
 
     public boolean addDisconnectEvent(String consumerId,
@@ -54,7 +58,9 @@ public class ConsumerEventManager {
             eventList = new LinkedList<>();
             LinkedList<ConsumerEvent> tmptList =
                     disconnectEventMap.putIfAbsent(consumerId, eventList);
-            if (tmptList != null) {
+            if (tmptList == null) {
+                masterMetrics.svrBalDisConEventConsumerCnt.incrementAndGet();
+            } else {
                 eventList = tmptList;
             }
         }
@@ -71,7 +77,9 @@ public class ConsumerEventManager {
             eventList = new LinkedList<>();
             LinkedList<ConsumerEvent> tmptList =
                     connectEventMap.putIfAbsent(consumerId, eventList);
-            if (tmptList != null) {
+            if (tmptList == null) {
+                masterMetrics.svrBalConEventConsumerCnt.incrementAndGet();
+            } else {
                 eventList = tmptList;
             }
         }
@@ -119,8 +127,9 @@ public class ConsumerEventManager {
     public ConsumerEvent removeFirst(String consumerId) {
         ConsumerEvent event = null;
         String group = consumerHolder.getGroupName(consumerId);
+        boolean selDisConnMap = hasDisconnectEvent(group);
         ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
-                hasDisconnectEvent(group) ? disconnectEventMap : connectEventMap;
+                selDisConnMap ? disconnectEventMap : connectEventMap;
         LinkedList<ConsumerEvent> eventList = currentEventMap.get(consumerId);
         if (eventList != null) {
             synchronized (eventList) {
@@ -128,6 +137,11 @@ public class ConsumerEventManager {
                     event = eventList.removeFirst();
                     if (eventList.isEmpty()) {
                         currentEventMap.remove(consumerId);
+                        if (selDisConnMap) {
+                            masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+                        } else {
+                            masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+                        }
                     }
                 }
             }
@@ -182,8 +196,15 @@ public class ConsumerEventManager {
     }
 
     public void removeAll(String consumerId) {
-        disconnectEventMap.remove(consumerId);
-        connectEventMap.remove(consumerId);
+        LinkedList<ConsumerEvent> eventInfos =
+                disconnectEventMap.remove(consumerId);
+        if (eventInfos != null) {
+            masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+        }
+        eventInfos = connectEventMap.remove(consumerId);
+        if (eventInfos != null) {
+            masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+        }
     }
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index fea5dcb..122f668 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -29,6 +29,8 @@ import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
 import org.apache.inlong.tubemq.server.common.utils.RowLock;
 import org.apache.inlong.tubemq.server.master.MasterConfig;
+import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +39,7 @@ public class ConsumerInfoHolder {
     private static final Logger logger =
             LoggerFactory.getLogger(ConsumerInfoHolder.class);
     private final MasterConfig masterConfig;     // master configure
+    private final MasterMetric masterMetrics;
     private final RowLock groupRowLock;    //lock
     private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
             new ConcurrentHashMap<>();
@@ -47,8 +50,9 @@ public class ConsumerInfoHolder {
     private final ConcurrentHashSet<String/* group */> clientBalanceGroupSet =
             new ConcurrentHashSet<>();
 
-    public ConsumerInfoHolder(MasterConfig masterConfig) {
-        this.masterConfig = masterConfig;
+    public ConsumerInfoHolder(TMaster tMasterr) {
+        this.masterMetrics = tMasterr.getMasterMetrics();
+        this.masterConfig = tMasterr.getMasterConfig();
         this.groupRowLock = new RowLock("Group-RowLock",
                 this.masterConfig.getRowLockWaitDurMs());
     }
@@ -351,14 +355,20 @@ public class ConsumerInfoHolder {
                 consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
                 if (consumeGroupInfo == null) {
                     consumeGroupInfo = tmpGroupInfo;
+                    masterMetrics.consumeGroupCnt.incrementAndGet();
                     if (tmpGroupInfo.isClientBalance()) {
                         clientBalanceGroupSet.add(group);
+                        masterMetrics.cltBalConsumeGroupCnt.incrementAndGet();
                     } else {
                         serverBalanceGroupSet.add(group);
                     }
                 }
             }
             if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
+                Boolean isNewAdd = (Boolean) result.checkData;
+                if (isNewAdd) {
+                    masterMetrics.consumerCnt.incrementAndGet();
+                }
                 if (!isNotAllocated) {
                     consumeGroupInfo.settAllocated();
                 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 3b01843..6acd0d0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -30,11 +30,12 @@ public class ProducerInfoHolder {
         return producerInfoMap.get(producerId);
     }
 
-    public void setProducerInfo(String producerId,
-                                Set<String> topicSet,
-                                String host, boolean overTLS) {
-        producerInfoMap.put(producerId,
+    public boolean setProducerInfo(String producerId,
+                                        Set<String> topicSet,
+                                        String host, boolean overTLS) {
+        ProducerInfo oldObj = producerInfoMap.put(producerId,
                 new ProducerInfo(producerId, topicSet, host, overTLS));
+        return (oldObj == null);
     }
 
     public void updateProducerInfo(String producerId,
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
new file mode 100644
index 0000000..193bebe
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class MasterMetricsTest {
+    private static final Logger logger = LoggerFactory.getLogger(MasterMetricsTest.class);
+
+    @Test
+    public void testAgentMetrics() {
+        try {
+            MasterMetric taskMetrics = MasterMetric.create();
+            taskMetrics.svrBalLatency.incrementAndGet();
+            Map<String, MetricValue> result = taskMetrics.snapshot();
+            Assert.assertEquals(1, taskMetrics.svrBalLatency.get());
+
+        } catch (Exception ex) {
+            logger.error("error happens" + ex);
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
index 24d3777..8d93ed0 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
 
 import static org.mockito.Mockito.mock;
 import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -27,11 +28,14 @@ import org.junit.Test;
 public class ConsumerEventManagerTest {
     private ConsumerEventManager consumerEventManager;
     private ConsumerInfoHolder consumerInfoHolder;
+    private MasterMetric masterMetrics;
 
     @Before
     public void setUp() throws Exception {
         consumerInfoHolder = mock(ConsumerInfoHolder.class);
-        consumerEventManager = new ConsumerEventManager(consumerInfoHolder);
+        masterMetrics = MasterMetric.create();
+        consumerEventManager =
+                new ConsumerEventManager(consumerInfoHolder, masterMetrics);
     }
 
     @After