You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/19 12:40:55 UTC

[incubator-inlong] branch master updated: [INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c53b805  [INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)
c53b805 is described below

commit c53b80542c999b3bd72886f3015a363e0f88adf5
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Feb 19 20:40:50 2022 +0800

    [INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)
---
 .../inlong/tubemq/client/common/ClientMetrics.java | 308 --------------
 .../tubemq/client/common/ClientStatsInfo.java      | 455 +++++++++++++++++++++
 .../tubemq/client/common/StatsOutputLevel.java     |  55 +++
 .../tubemq/client/common/TClientConstants.java     |  16 +-
 .../tubemq/client/config/TubeClientConfig.java     |  54 +--
 .../client/consumer/BaseMessageConsumer.java       |  78 ++--
 .../consumer/SimpleClientBalanceConsumer.java      |  49 +--
 .../client/consumer/SimplePullMessageConsumer.java |   2 +-
 .../client/consumer/SimplePushMessageConsumer.java |   2 +-
 .../tubemq/client/producer/MessageProducer.java    |   3 -
 .../tubemq/client/producer/ProducerManager.java    |  42 +-
 .../client/producer/SimpleMessageProducer.java     |   9 +-
 .../inlong/tubemq/corebase/TBaseConstants.java     |   1 +
 .../tubemq/corebase/metric/TrafficStatsUnit.java   |  56 +++
 .../inlong/tubemq/corebase/utils/TStringUtils.java |  24 ++
 .../inlong/tubemq/corebase/utils/ThreadUtils.java  |   4 +
 .../server/broker/msgstore/MessageStore.java       |   1 -
 .../server/broker/msgstore/disk/MsgFileStore.java  |   6 +-
 .../server/broker/msgstore/mem/MsgMemStore.java    |   4 +-
 .../server/broker/stats/BrokerSrvStatsHolder.java  |   4 +-
 .../server/broker/stats/MsgStoreStatsHolder.java   |   4 +-
 .../server/broker/stats/TrafficStatsService.java   |  50 +--
 .../tubemq/server/common/TServerConstants.java     |   3 +-
 .../server/common/webbase/WebCallStatsHolder.java  |   4 +-
 .../server/master/stats/MasterSrvStatsHolder.java  |   4 +-
 25 files changed, 751 insertions(+), 487 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java
deleted file mode 100644
index 2b21aca..0000000
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.client.common;
-
-import java.time.Clock;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corebase.metric.AbsMetricItem;
-import org.apache.inlong.tubemq.corebase.metric.CountMetricItem;
-import org.apache.inlong.tubemq.corebase.metric.TimeDltMetricItem;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClientMetrics {
-    private static final Logger logger =
-            LoggerFactory.getLogger(ClientMetrics.class);
-    private final boolean isProducer;
-    private final String clientId;
-    private final String logPrefix;
-    private final TubeClientConfig clientConfig;
-    private final AtomicLong lastMetricPrintTime =
-            new AtomicLong(0);
-    private final AtomicLong lastMetricResetTime =
-            new AtomicLong(Clock.systemDefaultZone().millis());
-    private final AtomicLong lastResetTime =
-            new AtomicLong(Clock.systemDefaultZone().millis());
-    // client 2 Master status statistics
-    protected final AbsMetricItem regMasterCnt =
-            new CountMetricItem("master_reg_cnt");
-    protected final AbsMetricItem regMasterFailureCnt =
-            new CountMetricItem("master_reg_failure_cnt");
-    protected final AbsMetricItem masterNoNodeCnt =
-            new CountMetricItem("master_timeout_cnt");
-    protected final AbsMetricItem masterHBExceptionCnt =
-            new CountMetricItem("hb_master_exception_cnt");
-    // client 2 Broker status statistics
-    protected final AbsMetricItem regBrokerCnt =
-            new CountMetricItem("broker_reg_cnt");
-    protected final AbsMetricItem regBrokerFailureCnt =
-            new CountMetricItem("broker_reg_failure_cnt");
-    protected final AbsMetricItem brokerNoNodeCnt =
-            new CountMetricItem("broker_timeout_cnt");
-    protected final AbsMetricItem brokerHBExceptionCnt =
-            new CountMetricItem("hb_broker_exception_cnt");
-    // send or get message statistic
-    private final TimeDltMetricItem rpcCallMetric =
-            new TimeDltMetricItem("rpc_call", true);
-    // statistic count
-    private final AbsMetricItem totalMsgCount =
-            new CountMetricItem("msg_cnt");
-    protected final AbsMetricItem totalMsgSize =
-            new CountMetricItem("msg_size");
-    // error code map
-    private final AbsMetricItem errCodeBadRequest =
-            new CountMetricItem("err_400");
-    private final AbsMetricItem errCodeNotFound =
-            new CountMetricItem("err_404");
-    private final AbsMetricItem errCodeOccupied =
-            new CountMetricItem("err_410");
-    private final AbsMetricItem errCodeNoNode =
-            new CountMetricItem("err_411");
-    private final AbsMetricItem errCodeOverFlow =
-            new CountMetricItem("err_419");
-    private final AbsMetricItem errCodeInTopic =
-            new CountMetricItem("err_425");
-    private final AbsMetricItem errCodeInFilters =
-            new CountMetricItem("err_426");
-    private final AbsMetricItem errCodeInSourceCnt =
-            new CountMetricItem("err_429");
-    private final AbsMetricItem errCodeSrvUnavailable =
-            new CountMetricItem("err_503");
-    private final AbsMetricItem errCodeOther =
-            new CountMetricItem("err_other");
-    // consumer statistic
-    private final TimeDltMetricItem procLatencyMetric =
-            new TimeDltMetricItem("process_data", true);
-
-    public ClientMetrics(boolean isProducer, String clientId,
-                         TubeClientConfig clientConfig) {
-        this.isProducer = isProducer;
-        this.clientId = clientId;
-        this.clientConfig = clientConfig;
-        StringBuilder strBuff = new StringBuilder(512);
-        if (isProducer) {
-            strBuff.append("[Producer");
-        } else {
-            strBuff.append("[Consumer");
-        }
-        this.logPrefix = strBuff.append(" Metrics]: ")
-                .append("Client=").append(clientId).toString();
-    }
-
-    public void bookReg2Master(boolean isFailure) {
-        this.regMasterCnt.incrementAndGet();
-        if (isFailure) {
-            this.regMasterFailureCnt.incrementAndGet();
-        }
-    }
-
-    public void bookHB2MasterTimeout() {
-        this.masterNoNodeCnt.incrementAndGet();
-    }
-
-    public void bookHB2MasterException() {
-        this.masterHBExceptionCnt.incrementAndGet();
-    }
-
-    public void bookReg2Broker(boolean isFailure) {
-        this.regBrokerCnt.incrementAndGet();
-        if (isFailure) {
-            this.regBrokerFailureCnt.incrementAndGet();
-        }
-    }
-
-    public void bookHB2BrokerTimeout() {
-        this.brokerNoNodeCnt.incrementAndGet();
-    }
-
-    public void bookHB2BrokerException() {
-        this.brokerHBExceptionCnt.incrementAndGet();
-    }
-
-    public void bookConfirmDuration(long dltTime) {
-        this.procLatencyMetric.updProcTimeDlt(dltTime);
-    }
-
-    public void bookSuccSendMsg(long dltTime, int msgSize) {
-        rpcCallMetric.updProcTimeDlt(dltTime);
-        totalMsgCount.incrementAndGet();
-        totalMsgSize.addAndGet(msgSize);
-    }
-
-    public void bookSuccGetMsg(long dltTime, int msgCnt, int msgSize) {
-        rpcCallMetric.updProcTimeDlt(dltTime);
-        totalMsgCount.addAndGet(msgCnt);
-        totalMsgSize.addAndGet(msgSize);
-    }
-
-    public void bookFailRpcCall(long dltTime, int errCode) {
-        rpcCallMetric.updProcTimeDlt(dltTime);
-        switch (errCode) {
-            case TErrCodeConstants.BAD_REQUEST: {
-                errCodeBadRequest.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.NOT_FOUND: {
-                errCodeNotFound.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.PARTITION_OCCUPIED: {
-                errCodeOccupied.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.HB_NO_NODE: {
-                errCodeNoNode.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.SERVER_RECEIVE_OVERFLOW: {
-                errCodeOverFlow.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET: {
-                errCodeInTopic.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET: {
-                errCodeInFilters.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT: {
-                errCodeInSourceCnt.incrementAndGet();
-                break;
-            }
-
-            case TErrCodeConstants.SERVICE_UNAVAILABLE: {
-                errCodeSrvUnavailable.incrementAndGet();
-                break;
-            }
-
-            default: {
-                errCodeOther.incrementAndGet();
-                break;
-            }
-        }
-    }
-
-    /**
-     * print Metric information to log file
-     *
-     * @param forcePrint    whether force print metric information
-     * @param strBuff       string buffer
-     */
-    public void printMetricInfo(boolean forcePrint, StringBuilder strBuff) {
-        if (!clientConfig.isEnableMetricPrint()) {
-            return;
-        }
-        boolean needReset = false;
-        long lstPrintTime = lastMetricPrintTime.get();
-        long curChkTime = Clock.systemDefaultZone().millis();
-        if (forcePrint || (curChkTime - lstPrintTime
-                > clientConfig.getMetricInfoPrintPeriodMs())) {
-            if (lastMetricPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
-                long lstResetTime = lastMetricResetTime.get();
-                if ((curChkTime - lstResetTime
-                        > clientConfig.getMetricForcedResetPeriodMs())
-                        && lastMetricResetTime.compareAndSet(lstResetTime, curChkTime)) {
-                    needReset = true;
-                }
-                if (needReset) {
-                    strBuff.append(this.logPrefix).append(", reset value=");
-                } else {
-                    strBuff.append(this.logPrefix).append(", value=");
-                }
-                getStrMetrics(strBuff, needReset);
-                logger.info(strBuff.toString());
-                strBuff.delete(0, strBuff.length());
-            }
-        }
-    }
-
-    /**
-     * Get current data encapsulated by Json
-     *
-     * @param strBuff      string buffer
-     * @param resetValue   whether to reset the current data
-     */
-    public void getStrMetrics(StringBuilder strBuff, boolean resetValue) {
-        strBuff.append("{\"").append(regMasterCnt.getName()).append("\":")
-                .append(regMasterCnt.getValue(resetValue)).append(",\"")
-                .append(regMasterFailureCnt.getName()).append("\":")
-                .append(regMasterFailureCnt.getValue(resetValue)).append(",\"")
-                .append(masterNoNodeCnt.getName()).append("\":")
-                .append(masterNoNodeCnt.getValue(resetValue)).append(",\"")
-                .append(masterHBExceptionCnt.getName()).append("\":")
-                .append(masterHBExceptionCnt.getValue(resetValue)).append(",\"")
-                .append(totalMsgCount.getName()).append("\":")
-                .append(totalMsgCount.getValue(resetValue)).append(",\"")
-                .append(totalMsgSize.getName()).append("\":")
-                .append(totalMsgSize.getValue(resetValue)).append(",");
-        rpcCallMetric.getStrMetrics(strBuff, resetValue);
-        strBuff.append(",\"err_rsp\":{\"")
-                .append(errCodeBadRequest.getName()).append("\":")
-                .append(errCodeBadRequest.getValue(resetValue)).append(",\"")
-                .append(errCodeNotFound.getName()).append("\":")
-                .append(errCodeNotFound.getValue(resetValue)).append(",\"")
-                .append(errCodeOccupied.getName()).append("\":")
-                .append(errCodeOccupied.getValue(resetValue)).append(",\"")
-                .append(errCodeNoNode.getName()).append("\":")
-                .append(errCodeNoNode.getValue(resetValue)).append(",\"")
-                .append(errCodeOverFlow.getName()).append("\":")
-                .append(errCodeOverFlow.getValue(resetValue)).append(",\"")
-                .append(errCodeInTopic.getName()).append("\":")
-                .append(errCodeInTopic.getValue(resetValue)).append(",\"")
-                .append(errCodeInFilters.getName()).append("\":")
-                .append(errCodeInFilters.getValue(resetValue)).append(",\"")
-                .append(errCodeInSourceCnt.getName()).append("\":")
-                .append(errCodeInSourceCnt.getValue(resetValue)).append(",\"")
-                .append(errCodeSrvUnavailable.getName()).append("\":")
-                .append(errCodeSrvUnavailable.getValue(resetValue)).append(",\"")
-                .append(errCodeOther.getName()).append("\":")
-                .append(errCodeOther.getValue(resetValue)).append("}");
-        if (!isProducer) {
-            strBuff.append(",\"").append(regBrokerCnt.getName()).append("\":")
-                    .append(regBrokerCnt.getValue(resetValue)).append(",\"")
-                    .append(regBrokerFailureCnt.getName()).append("\":")
-                    .append(regBrokerFailureCnt.getValue(resetValue)).append(",\"")
-                    .append(brokerNoNodeCnt.getName()).append("\":")
-                    .append(brokerNoNodeCnt.getValue(resetValue)).append(",\"")
-                    .append(brokerHBExceptionCnt.getName()).append("\":")
-                    .append(brokerHBExceptionCnt.getValue(resetValue)).append(",");
-            procLatencyMetric.getStrMetrics(strBuff, resetValue);
-        }
-        if (resetValue) {
-            long befTime = lastResetTime.getAndSet(Clock.systemDefaultZone().millis());
-            strBuff.append(",\"last_reset_time\":\"")
-                    .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(befTime)).append("\"}");
-        } else {
-            strBuff.append(",\"last_reset_time\":\"")
-                    .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(lastResetTime.get()))
-                    .append("\"}");
-        }
-    }
-}
-
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
new file mode 100644
index 0000000..186da25
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.time.Clock;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.metric.TrafficStatsUnit;
+import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientStatsInfo {
+    private static final Logger logger =
+            LoggerFactory.getLogger(ClientStatsInfo.class);
+    private final boolean isProducer;
+    private final String logPrefix;
+    // switchable statistic items
+    private final ClientStatsItemSet[] switchableSets = new ClientStatsItemSet[2];
+    // current writable index
+    private final AtomicInteger writableIndex = new AtomicInteger(0);
+    // statistics self-print period, ms
+    private final long statsPrintPeriodMs;
+    // statistics force reset period, ms
+    private final long statsForcedResetPeriodMs;
+    // whether the statistic is closed
+    private volatile boolean isClosed = false;
+    // whether to self-print statistics
+    private volatile boolean isSelfPrint = true;
+    // last self-print time
+    private final AtomicLong lstSelfPrintTime = new AtomicLong(0);
+    // last snapshot time
+    private final AtomicLong lstSnapshotTime = new AtomicLong(0);
+
+    public ClientStatsInfo(boolean isProducer, String clientId,
+                           boolean isSelfPrint, long statsPrintPeriodMs,
+                           long statsForcedResetPeriodMs) {
+        this.isProducer = isProducer;
+        this.isSelfPrint = isSelfPrint;
+        this.statsPrintPeriodMs = statsPrintPeriodMs;
+        this.statsForcedResetPeriodMs = statsForcedResetPeriodMs;
+        StringBuilder strBuff = new StringBuilder(512);
+        if (isProducer) {
+            strBuff.append("[Producer");
+        } else {
+            strBuff.append("[Consumer");
+        }
+        this.logPrefix = strBuff.append(" Stats]: ")
+                .append("Client=").append(clientId).toString();
+        this.switchableSets[0] = new ClientStatsItemSet();
+        this.switchableSets[1] = new ClientStatsItemSet();
+    }
+
+    public synchronized void setStatsStatus(boolean enableStats) {
+        this.isClosed = !enableStats;
+    }
+
+    public boolean isStatsClosed() {
+        return this.isClosed;
+    }
+
+    public boolean isStatsSelfPrint() {
+        return this.isSelfPrint;
+    }
+
+    public void bookReg2Master(boolean isFailure) {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].regMasterCnt.incValue();
+        if (isFailure) {
+            switchableSets[getIndex()].regMasterFailCnt.incValue();
+        }
+    }
+
+    public void bookHB2MasterTimeout() {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].regMasterTimoutCnt.incValue();
+    }
+
+    public void bookHB2MasterException() {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].hbMasterExcCnt.incValue();
+    }
+
+    public void bookReg2Broker(boolean isFailure) {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].regBrokerCnt.incValue();
+        if (isFailure) {
+            switchableSets[getIndex()].regBrokerFailCnt.incValue();
+        }
+    }
+
+    public void bookHB2BrokerTimeout() {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].regBrokerTimoutCnt.incValue();
+    }
+
+    public void bookHB2BrokerException() {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].hbBrokerExcCnt.incValue();
+    }
+
+    public void bookConfirmDuration(long dltTime) {
+        if (isClosed) {
+            return;
+        }
+        switchableSets[getIndex()].csmLatencyStats.update(dltTime);
+    }
+
+    public void bookSuccSendMsg(long dltTime, String topicName, int msgSize) {
+        if (isClosed) {
+            return;
+        }
+        sendOrRecvMsg(topicName, dltTime, 1, msgSize);
+        bookFailRpcCall(TErrCodeConstants.SUCCESS);
+    }
+
+    public void bookSuccGetMsg(long dltTime, String topicName, int msgCnt, int msgSize) {
+        if (isClosed) {
+            return;
+        }
+        sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
+        bookFailRpcCall(TErrCodeConstants.SUCCESS);
+    }
+
+    public void bookFailRpcCall(int errCode) {
+        if (isClosed) {
+            return;
+        }
+        // accumulate msg count by errcode
+        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+        LongStatsCounter curItemCounter = curItemSet.errRspStatsMap.get(errCode);
+        if (curItemCounter == null) {
+            LongStatsCounter tmpCounter = new LongStatsCounter(
+                    "err_" + errCode, "");
+            curItemCounter = curItemSet.errRspStatsMap.putIfAbsent(errCode, tmpCounter);
+            if (curItemCounter == null) {
+                curItemCounter = tmpCounter;
+            }
+        }
+        curItemCounter.incValue();
+    }
+
+    /**
+     * Self print statistics information to log file
+     *
+     * @param forcePrint    whether force print statistics information
+     * @param strBuff       string buffer
+     */
+    public void selfPrintStatsInfo(boolean forcePrint, StringBuilder strBuff) {
+        if (this.isClosed || !this.isSelfPrint) {
+            return;
+        }
+        long lstPrintTime = lstSelfPrintTime.get();
+        long curChkTime = Clock.systemDefaultZone().millis();
+        if (forcePrint || (curChkTime - lstPrintTime > this.statsPrintPeriodMs)) {
+            if (lstSelfPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
+                if (switchWritingStatsUnit(false)) {
+                    strBuff.append(this.logPrefix).append(", reset value=");
+                    getStatsInfo(switchableSets[getIndex(writableIndex.get() - 1)],
+                            strBuff, StatsOutputLevel.FULL, false);
+                } else {
+                    strBuff.append(this.logPrefix).append(", value=");
+                    getStatsInfo(switchableSets[getIndex()],
+                            strBuff, StatsOutputLevel.FULL, false);
+                }
+                logger.info(strBuff.toString());
+                strBuff.delete(0, strBuff.length());
+            }
+        }
+    }
+
+    // private functions
+    private boolean switchWritingStatsUnit(boolean needReset) {
+        long lstResetTime = lstSnapshotTime.get();
+        long checkDltTime = System.currentTimeMillis() - lstResetTime;
+        if (((needReset && (checkDltTime > TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS))
+                || (checkDltTime > this.statsForcedResetPeriodMs))
+                && lstSnapshotTime.compareAndSet(lstResetTime, System.currentTimeMillis())) {
+            switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Get current data encapsulated by Json format
+     *
+     * @param strBuff      string buffer
+     * @param outputLevel  the output level of statistics
+     * @param resetValue   whether to reset the current data
+     */
+    private void getStatsInfo(ClientStatsItemSet statsSet, StringBuilder strBuff,
+                              StatsOutputLevel outputLevel, boolean resetValue) {
+        int totalCnt = 0;
+        strBuff.append("{\"").append(statsSet.resetTime.getFullName())
+                .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
+                .append("\",\"probe_time\":\"")
+                .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
+                .append("\"");
+        if (resetValue) {
+            strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
+                    .append("\":").append(statsSet.totalTrafficStats.msgCnt.getAndResetValue())
+                    .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
+                    .append("\":").append(statsSet.totalTrafficStats.msgSize.getAndResetValue())
+                    .append(",\"traffic_details\":{");
+            for (Map.Entry<String, TrafficStatsUnit> entry
+                    : statsSet.topicTrafficMap.entrySet()) {
+                if (entry == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                strBuff.append("\"").append(entry.getKey()).append("\":{\"")
+                        .append(entry.getValue().msgCnt.getShortName()).append("\":")
+                         .append(entry.getValue().msgCnt.getAndResetValue()).append(",\"")
+                        .append(entry.getValue().msgSize.getShortName()).append("\":")
+                        .append(entry.getValue().msgCnt.getAndResetValue()).append("}");
+            }
+            strBuff.append("}");
+            if (outputLevel != StatsOutputLevel.SIMPLEST) {
+                strBuff.append(",");
+                statsSet.msgCallDltStats.snapShort(strBuff, false);
+                if (!isProducer) {
+                    strBuff.append(",");
+                    statsSet.csmLatencyStats.snapShort(strBuff, false);
+                }
+                strBuff.append(",\"rsp_details\":{");
+                totalCnt = 0;
+                for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
+                    if (statsCounter == null) {
+                        continue;
+                    }
+                    if (totalCnt++ > 0) {
+                        strBuff.append(",");
+                    }
+                    strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+                            .append(statsCounter.getAndResetValue());
+                }
+                strBuff.append("}");
+            }
+            if (outputLevel == StatsOutputLevel.FULL) {
+                strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterFailCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterTimoutCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
+                        .append("\":").append(statsSet.hbMasterExcCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.regBrokerCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerFailCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerTimoutCnt.getAndResetValue())
+                        .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
+                        .append("\":").append(statsSet.hbBrokerExcCnt.getAndResetValue());
+            }
+            strBuff.append("}");
+        } else {
+            strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
+                    .append("\":").append(statsSet.totalTrafficStats.msgCnt.getValue())
+                    .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
+                    .append("\":").append(statsSet.totalTrafficStats.msgSize.getValue())
+                    .append(",\"traffic_details\":{");
+            for (Map.Entry<String, TrafficStatsUnit> entry
+                    : statsSet.topicTrafficMap.entrySet()) {
+                if (entry == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                strBuff.append("\"").append(entry.getKey()).append("\":{\"")
+                        .append(entry.getValue().msgCnt.getShortName()).append("\":")
+                        .append(entry.getValue().msgCnt.getValue()).append(",\"")
+                        .append(entry.getValue().msgSize.getShortName()).append("\":")
+                        .append(entry.getValue().msgCnt.getValue()).append("}");
+            }
+            strBuff.append("}");
+            if (outputLevel != StatsOutputLevel.SIMPLEST) {
+                strBuff.append(",");
+                statsSet.msgCallDltStats.getValue(strBuff, false);
+                if (!isProducer) {
+                    strBuff.append(",");
+                    statsSet.csmLatencyStats.getValue(strBuff, false);
+                }
+                strBuff.append(",\"rsp_details\":{");
+                totalCnt = 0;
+                for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
+                    if (statsCounter == null) {
+                        continue;
+                    }
+                    if (totalCnt++ > 0) {
+                        strBuff.append(",");
+                    }
+                    strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+                            .append(statsCounter.getValue());
+                }
+                strBuff.append("}");
+            }
+            if (outputLevel == StatsOutputLevel.FULL) {
+                strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterCnt.getValue())
+                        .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterFailCnt.getValue())
+                        .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
+                        .append("\":").append(statsSet.regMasterTimoutCnt.getValue())
+                        .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
+                        .append("\":").append(statsSet.hbMasterExcCnt.getValue())
+                        .append(",\"").append(statsSet.regBrokerCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerCnt.getValue())
+                        .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerFailCnt.getValue())
+                        .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
+                        .append("\":").append(statsSet.regBrokerTimoutCnt.getValue())
+                        .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
+                        .append("\":").append(statsSet.hbBrokerExcCnt.getValue());
+            }
+            strBuff.append("}");
+        }
+    }
+
+    /**
+     * Accumulate sent or received message information
+     *
+     * @param topic     the topic name
+     * @param dltTime   the latency
+     * @param msgCnt    the message count
+     * @param msgSize   the message size
+     */
+    private void sendOrRecvMsg(String topic, long dltTime,
+                               int msgCnt, int msgSize) {
+        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+        curItemSet.msgCallDltStats.update(dltTime);
+        curItemSet.totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
+        // accumulate traffic information by topic
+        TrafficStatsUnit curStatsUnit = curItemSet.topicTrafficMap.get(topic);
+        if (curStatsUnit == null) {
+            TrafficStatsUnit tmpUnit =
+                    new TrafficStatsUnit("msg_cnt", "msg_size", topic);
+            curStatsUnit = curItemSet.topicTrafficMap.putIfAbsent(topic, tmpUnit);
+            if (curStatsUnit == null) {
+                curStatsUnit = tmpUnit;
+            }
+        }
+        curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+    }
+
+    /**
+     * Get current writable block index.
+     *
+     * @return the writable block index
+     */
+    private int getIndex() {
+        return getIndex(writableIndex.get());
+    }
+
+    /**
+     * Gets the metric block index based on the specified value.
+     *
+     * @param origIndex    the specified value
+     * @return the metric block index
+     */
+    private int getIndex(int origIndex) {
+        return Math.abs(origIndex % 2);
+    }
+
+    /**
+     * ClientStatsItemSet, Client related statistics set
+     *
+     */
+    private static class ClientStatsItemSet {
+        // The reset time of statistics set
+        protected final SinceTime resetTime =
+                new SinceTime("reset_time", null);
+        // received or sent message traffic statistic
+        protected final TrafficStatsUnit totalTrafficStats =
+                new TrafficStatsUnit("msg_cnt", "msg_size", "total");
+        // topic-based traffic statistics
+        protected final ConcurrentHashMap<String, TrafficStatsUnit> topicTrafficMap =
+                new ConcurrentHashMap<>();
+        // time consumption statistics for sending or receiving messages
+        protected final ESTHistogram msgCallDltStats =
+                new ESTHistogram("msg_call_dlt", "");
+        // statistics on consumption transaction time
+        protected final ESTHistogram csmLatencyStats =
+                new ESTHistogram("csm_latency_dlt", "");
+        // error response distribution statistics
+        protected final ConcurrentHashMap<Integer, LongStatsCounter> errRspStatsMap =
+                new ConcurrentHashMap<>();
+        // client 2 Master status statistics
+        protected final LongStatsCounter regMasterCnt =
+                new LongStatsCounter("reg_master_cnt", null);
+        protected final LongStatsCounter regMasterFailCnt =
+                new LongStatsCounter("reg_master_fail", null);
+        protected final LongStatsCounter regMasterTimoutCnt =
+                new LongStatsCounter("reg_master_timeout", null);
+        protected final LongStatsCounter hbMasterExcCnt =
+                new LongStatsCounter("hb_master_exception", null);
+        // client 2 Broker status statistics
+        protected final LongStatsCounter regBrokerCnt =
+                new LongStatsCounter("reg_broker_cnt", null);
+        protected final LongStatsCounter regBrokerFailCnt =
+                new LongStatsCounter("reg_broker_fail", null);
+        protected final LongStatsCounter regBrokerTimoutCnt =
+                new LongStatsCounter("reg_broker_timeout", null);
+        protected final LongStatsCounter hbBrokerExcCnt =
+                new LongStatsCounter("hb_broker_exception", null);
+
+        public ClientStatsItemSet() {
+            resetSinceTime();
+        }
+
+        public void resetSinceTime() {
+            this.resetTime.reset();
+        }
+    }
+}
+
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
new file mode 100644
index 0000000..e45f7f4
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+public enum StatsOutputLevel {
+    SIMPLEST(0, "simplest", "Simplest statistics output"),
+    MEDIUM(1, "medium", "Medium statistics output"),
+    FULL(2, "full", "Full statistics output");
+
+    StatsOutputLevel(int id, String name, String desc) {
+        this.id = id;
+        this.name = name;
+        this.desc = desc;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public static StatsOutputLevel valueOf(int value) {
+        for (StatsOutputLevel outputLevel : StatsOutputLevel.values()) {
+            if (outputLevel.getId() == value) {
+                return outputLevel;
+            }
+        }
+        return SIMPLEST;
+    }
+
+    private final int id;
+    private final String name;
+    private final String desc;
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index 7b5b8f1..0269af2 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -40,13 +40,13 @@ public class TClientConstants {
     public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
     public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
 
-    // client metric information print period
-    public static final long METRIC_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
-    public static final long METRIC_PRINT_MIN_PERIOD_MS = 3 * 1000 * 60;
-    public static final long METRIC_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
+    // client statistics information print period
+    public static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
+    public static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60;
+    public static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
 
-    // client metric information print period
-    public static final long METRIC_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
-    public static final long METRIC_RESET_MIN_PERIOD_MS = 30 * 60 * 1000;
-    public static final long METRIC_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
+    // client statistics information print period
+    public static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
+    public static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 60 * 1000;
+    public static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index aaef618..f06a4c0 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -56,11 +56,11 @@ public class TubeClientConfig {
     // Link statistic check duration in ms.
     private long linkStatsDurationMs = RpcConstants.CFG_LQ_STATS_DURATION_MS;
     // Enable metric information print
-    private boolean enableMetricPrint = true;
+    private boolean enableStatsSelfPrint = true;
     // Metric print period in ms.
-    private long metricInfoPrintPeriodMs = TClientConstants.METRIC_PRINT_DEFAULT_PERIOD_MS;
+    private long statsSelfPrintPeriodMs = TClientConstants.STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
     // Metric reset value period in ms.
-    private long metricForcedResetPeriodMs = TClientConstants.METRIC_RESET_DEFAULT_PERIOD_MS;
+    private long statsForcedResetPeriodMs = TClientConstants.STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
 
     // The following 5 configuration parameters are used in broker exception process.
     //
@@ -469,34 +469,34 @@ public class TubeClientConfig {
         return usrPassWord;
     }
 
-    public boolean isEnableMetricPrint() {
-        return enableMetricPrint;
+    public boolean enableStatsSelfPrint() {
+        return enableStatsSelfPrint;
     }
 
-    public void setEnableMetricPrint(boolean enableMetricPrint) {
-        this.enableMetricPrint = enableMetricPrint;
+    public void setStatsSelfPrint(boolean enableStatsSelfPrint) {
+        this.enableStatsSelfPrint = enableStatsSelfPrint;
     }
 
-    public long getMetricInfoPrintPeriodMs() {
-        return metricInfoPrintPeriodMs;
+    public long getStatsSelfPrintPeriodMs() {
+        return statsSelfPrintPeriodMs;
     }
 
-    public void setMetricInfoPrintPeriodMs(long metricInfoPrintPeriodMs) {
-        this.metricInfoPrintPeriodMs =
-                MixedUtils.mid(metricInfoPrintPeriodMs,
-                        TClientConstants.METRIC_PRINT_MIN_PERIOD_MS,
-                        TClientConstants.METRIC_PRINT_MAX_PERIOD_MS);
+    public void setStatsSelfPrintPeriodMs(long statsSelfPrintPeriodMs) {
+        this.statsSelfPrintPeriodMs =
+                MixedUtils.mid(statsSelfPrintPeriodMs,
+                        TClientConstants.STATS_SELF_PRINT_MIN_PERIOD_MS,
+                        TClientConstants.STATS_SELF_PRINT_MAX_PERIOD_MS);
     }
 
-    public long getMetricForcedResetPeriodMs() {
-        return metricForcedResetPeriodMs;
+    public long getStatsForcedResetPeriodMs() {
+        return statsForcedResetPeriodMs;
     }
 
-    public void setMetricForcedResetPeriodMs(long metricForcedResetPeriodMs) {
-        this.metricForcedResetPeriodMs =
-                MixedUtils.mid(metricForcedResetPeriodMs,
-                        TClientConstants.METRIC_RESET_MIN_PERIOD_MS,
-                        TClientConstants.METRIC_RESET_MAX_PERIOD_MS);
+    public void setStatsForcedResetPeriodMs(long statsForcedResetPeriodMs) {
+        this.statsForcedResetPeriodMs =
+                MixedUtils.mid(statsForcedResetPeriodMs,
+                        TClientConstants.STATS_AUTO_RESET_MIN_PERIOD_MS,
+                        TClientConstants.STATS_AUTO_RESET_MAX_PERIOD_MS);
     }
 
     @Override
@@ -591,13 +591,13 @@ public class TubeClientConfig {
         if (!this.tlsConfig.equals(that.tlsConfig)) {
             return false;
         }
-        if (this.enableMetricPrint != that.enableMetricPrint) {
+        if (this.enableStatsSelfPrint != that.enableStatsSelfPrint) {
             return false;
         }
-        if (this.metricInfoPrintPeriodMs != that.metricInfoPrintPeriodMs) {
+        if (this.statsSelfPrintPeriodMs != that.statsSelfPrintPeriodMs) {
             return false;
         }
-        if (this.metricForcedResetPeriodMs != that.metricForcedResetPeriodMs) {
+        if (this.statsForcedResetPeriodMs != that.statsForcedResetPeriodMs) {
             return false;
         }
         return masterInfo.equals(that.masterInfo);
@@ -648,9 +648,9 @@ public class TubeClientConfig {
             .append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
             .append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
             .append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
-            .append(",\"enableMetricPrint\":").append(this.enableMetricPrint)
-            .append(",\"metricInfoPrintPeriodMs\":").append(this.metricInfoPrintPeriodMs)
-            .append(",\"metricResetPeriodMs\":").append(this.metricForcedResetPeriodMs)
+            .append(",\"enableStatsSelfPrint\":").append(this.enableStatsSelfPrint)
+            .append(",\"statsSelfPrintPeriodMs\":").append(this.statsSelfPrintPeriodMs)
+            .append(",\"statsForcedResetPeriodMs\":").append(this.statsForcedResetPeriodMs)
             .append(",\"usrName\":\"").append(this.usrName)
             .append("\",\"usrPassWord\":\"").append(this.usrPassWord)
             .append("\",\"localAddress\":\"").append(localAddress)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 744bfd9..5571507 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
 import org.apache.inlong.tubemq.client.common.TubeClientVersion;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -96,15 +96,16 @@ public class BaseMessageConsumer implements MessageConsumer {
     private final ConsumerSamplePrint samplePrintCtrl =
             new ConsumerSamplePrint();
     private final RpcConfig rpcConfig = new RpcConfig();
-    private AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    private AtomicReference<String> authAuthorizedTokenRef =
+    private final AtomicLong visitToken =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private final AtomicReference<String> authAuthorizedTokenRef =
             new AtomicReference<>("");
-    private ClientAuthenticateHandler authenticateHandler =
+    private final ClientAuthenticateHandler authenticateHandler =
             new SimpleClientAuthenticateHandler();
     private Thread heartBeatThread2Broker;
-    private AtomicBoolean isShutdown = new AtomicBoolean(false);
-    private AtomicBoolean isRebalanceStopped = new AtomicBoolean(false);
-    private AtomicBoolean isFirst = new AtomicBoolean(true);
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private final AtomicBoolean isRebalanceStopped = new AtomicBoolean(false);
+    private final AtomicBoolean isFirst = new AtomicBoolean(true);
     private int heartbeatRetryTimes = 0;
     // Status:
     // -1: Unsubscribed
@@ -116,7 +117,7 @@ public class BaseMessageConsumer implements MessageConsumer {
     private int rebalanceRetryTimes = 0;
     private long lastHeartbeatTime2Master = 0;
     private long lastHeartbeatTime2Broker = 0;
-    protected final ClientMetrics clientMetrics;
+    protected final ClientStatsInfo clientStatsInfo;
 
     /**
      * Construct a BaseMessageConsumer object.
@@ -143,8 +144,11 @@ public class BaseMessageConsumer implements MessageConsumer {
         } catch (Exception e) {
             throw new TubeClientException("Get consumer id failed!", e);
         }
-        this.clientMetrics =
-                new ClientMetrics(false, this.consumerId, this.consumerConfig);
+        this.clientStatsInfo =
+                new ClientStatsInfo(false, this.consumerId,
+                        this.consumerConfig.enableStatsSelfPrint(),
+                        this.consumerConfig.getStatsSelfPrintPeriodMs(),
+                        this.consumerConfig.getStatsForcedResetPeriodMs());
         this.rmtDataCache =
                 new RmtDataCache(this.consumerConfig, null);
         this.rpcServiceFactory =
@@ -454,7 +458,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 //
             }
         }
-        clientMetrics.printMetricInfo(true, strBuffer);
+        clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
         logger.info(strBuffer
                 .append("[SHUTDOWN_CONSUMER] Partitions unregistered,  consumer :")
                 .append(this.consumerId).toString());
@@ -828,7 +832,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                                 .consumerRegisterC2B(createBrokerRegisterRequest(partition),
                                     AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                         if (responseB2C != null && responseB2C.getSuccess()) {
-                            clientMetrics.bookReg2Broker(false);
+                            clientStatsInfo.bookReg2Broker(false);
                             long currOffset =
                                 responseB2C.hasCurrOffset() ? responseB2C.getCurrOffset()
                                         : TBaseConstants.META_VALUE_UNDEFINED;
@@ -842,7 +846,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                                 .append(partition.toString()).toString());
                             strBuffer.delete(0, strBuffer.length());
                         } else {
-                            clientMetrics.bookReg2Broker(true);
+                            clientStatsInfo.bookReg2Broker(true);
                             if (responseB2C == null) {
                                 logger.warn(strBuffer.append("register2broker error! ")
                                     .append(retryTimesRegister2Broker).append(" register ")
@@ -1208,9 +1212,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                                     partition, taskContext.isLastConsumed()),
                                     AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
         } catch (Throwable ee) {
-            clientMetrics.bookFailRpcCall(
-                    System.currentTimeMillis() - startTime,
-                    TErrCodeConstants.UNSPECIFIED_ABNORMAL);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.UNSPECIFIED_ABNORMAL);
             // Process the exception
             rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
             taskContext.setFailProcessResult(400, strBuffer
@@ -1221,7 +1223,7 @@ public class BaseMessageConsumer implements MessageConsumer {
         }
         long dltTime = System.currentTimeMillis() - startTime;
         if (msgRspB2C == null) {
-            clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
             rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
             taskContext.setFailProcessResult(500, "Get message null");
             return taskContext;
@@ -1278,7 +1280,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                         strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                             .append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
                     strBuffer.delete(0, strBuffer.length());
-                    clientMetrics.bookSuccGetMsg(dltTime, msgCount, msgSize);
+                    clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
                     break;
                 }
                 case TErrCodeConstants.HB_NO_NODE:
@@ -1339,13 +1341,11 @@ public class BaseMessageConsumer implements MessageConsumer {
                 }
             }
             if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
-                clientMetrics.bookFailRpcCall(dltTime, msgRspB2C.getErrCode());
+                clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
             }
             return taskContext;
         } catch (Throwable ee) {
-            clientMetrics.bookFailRpcCall(
-                    System.currentTimeMillis() - startTime,
-                    TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
             logger.error("Process response code error", ee);
             rmtDataCache.succRspRelease(partitionKey, topic,
                     taskContext.getUsedToken(), false, isFilterConsume(topic),
@@ -1422,7 +1422,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
                         consumerConfig.getPullProtectConfirmTimeoutMs());
                 // print metric information
-                clientMetrics.printMetricInfo(false, strBuffer);
+                clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
                 // Fetch the rebalance result, construct message adn return it.
                 ConsumerEvent event = rebalanceResults.poll();
                 List<SubscribeInfo> subInfoList = null;
@@ -1442,7 +1442,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                 // Process unsuccessful response
                 if (response == null) {
-                    clientMetrics.bookHB2MasterTimeout();
+                    clientStatsInfo.bookHB2MasterTimeout();
                     logger.error(strBuffer.append("[Heartbeat Failed] ")
                             .append("return result is null!").toString());
                     heartbeatRetryTimes++;
@@ -1451,7 +1451,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 if (!response.getSuccess()) {
                     // If master replies that cannot find current consumer node, re-register
                     if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
-                        clientMetrics.bookHB2MasterTimeout();
+                        clientStatsInfo.bookHB2MasterTimeout();
                         try {
                             ClientMaster.RegisterResponseM2C regResponse =
                                 masterService.consumerRegisterC2M(createMasterRegisterRequest(),
@@ -1493,7 +1493,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                         }
                         return;
                     }
-                    clientMetrics.bookHB2MasterException();
+                    clientStatsInfo.bookHB2MasterException();
                     logger.error(strBuffer.append("[Heartbeat Failed] ")
                             .append(response.getErrMsg()).toString());
                     if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
@@ -1599,8 +1599,11 @@ public class BaseMessageConsumer implements MessageConsumer {
                                                 createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
                                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                                 // When response is success
-                                if (heartBeatResponseV2 != null
-                                    && heartBeatResponseV2.getSuccess()) {
+                                if (heartBeatResponseV2 == null) {
+                                    clientStatsInfo.bookHB2BrokerTimeout();
+                                    continue;
+                                }
+                                if (heartBeatResponseV2.getSuccess()) {
                                     // If the peer require authentication, set a flag.
                                     // The following request will attach the auth information.
                                     rmtDataCache.bookBrokerRequireAuthInfo(
@@ -1644,19 +1647,18 @@ public class BaseMessageConsumer implements MessageConsumer {
                                             }
                                         }
                                     }
-                                }
-                                if (heartBeatResponseV2 != null) {
-                                    clientMetrics.bookHB2BrokerException();
+                                } else {
+                                    clientStatsInfo.bookHB2BrokerException();
                                     if (heartBeatResponseV2.getErrCode()
-                                        == TErrCodeConstants.CERTIFICATE_FAILURE) {
+                                            == TErrCodeConstants.CERTIFICATE_FAILURE) {
                                         for (Partition partition : partitions) {
                                             removePartition(partition);
                                         }
                                         logger.warn(strBuffer
-                                            .append("[heart2broker error] certificate failure, ")
-                                            .append(brokerInfo.getBrokerStrInfo())
-                                            .append("'s partitions area released, ")
-                                            .append(heartBeatResponseV2.getErrMsg()).toString());
+                                                .append("[heart2broker error] certificate failure, ")
+                                                .append(brokerInfo.getBrokerStrInfo())
+                                                .append("'s partitions area released, ")
+                                                .append(heartBeatResponseV2.getErrMsg()).toString());
                                         strBuffer.delete(0, strBuffer.length());
                                     }
                                 }
@@ -1665,7 +1667,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                             // If there's error in the heartbeat, collect the log and print out.
                             // Release the log string buffer.
                             if (!isShutdown()) {
-                                clientMetrics.bookHB2BrokerException();
+                                clientStatsInfo.bookHB2BrokerException();
                                 samplePrintCtrl.printExceptionCaught(ee);
                                 if (!partStrSet.isEmpty()) {
                                     strBuffer.delete(0, strBuffer.length());
@@ -1685,7 +1687,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                     lastHeartbeatTime2Broker = System.currentTimeMillis();
                     Thread.sleep(consumerConfig.getHeartbeatPeriodMs());
                 } catch (Throwable e) {
-                    clientMetrics.bookHB2BrokerException();
+                    clientStatsInfo.bookHB2BrokerException();
                     lastHeartbeatTime2Broker = System.currentTimeMillis();
                     if (!isShutdown()) {
                         logger.error("heartbeat thread error 3 : ", e);
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 12a10f9..8ac51fa 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
 import org.apache.inlong.tubemq.client.common.ConfirmResult;
 import org.apache.inlong.tubemq.client.common.ConsumeResult;
 import org.apache.inlong.tubemq.client.common.QueryMetaResult;
@@ -108,7 +108,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
     private long lastHeartbeatTime2Broker = 0;
     private final ConcurrentHashMap<String, Long> partRegFreqCtrlMap =
             new ConcurrentHashMap<>();
-    protected final ClientMetrics clientMetrics;
+    protected final ClientStatsInfo clientStatsInfo;
 
     public SimpleClientBalanceConsumer(final InnerSessionFactory messageSessionFactory,
                                        final ConsumerConfig consumerConfig) throws TubeClientException {
@@ -127,8 +127,11 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
         }
         this.clientRmtDataCache =
                 new RmtDataCache(this.consumerConfig, null);
-        this.clientMetrics =
-                new ClientMetrics(false, this.consumerId, this.consumerConfig);
+        this.clientStatsInfo =
+                new ClientStatsInfo(false, this.consumerId,
+                        this.consumerConfig.enableStatsSelfPrint(),
+                        this.consumerConfig.getStatsSelfPrintPeriodMs(),
+                        this.consumerConfig.getStatsForcedResetPeriodMs());
         this.rpcServiceFactory =
                 this.sessionFactory.getRpcServiceFactory();
         this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
@@ -312,7 +315,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
             }
         }
         // print metric information
-        clientMetrics.printMetricInfo(true, strBuffer);
+        clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
         logger.info(strBuffer
                 .append("[SHUTDOWN_CONSUMER] Partitions unregistered,  consumer :")
                 .append(this.consumerId).toString());
@@ -694,9 +697,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                                     AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
         } catch (Throwable ee) {
             // Process the exception
-            clientMetrics.bookFailRpcCall(
-                    System.currentTimeMillis() - startTime,
-                    TErrCodeConstants.BAD_REQUEST);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.BAD_REQUEST);
             clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
             taskContext.setFailProcessResult(400, sBuffer
                     .append("Get message error, reason is ")
@@ -706,7 +707,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
         }
         long dltTime = System.currentTimeMillis() - startTime;
         if (msgRspB2C == null) {
-            clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
             clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
             taskContext.setFailProcessResult(500, "Get message null");
             return taskContext;
@@ -763,7 +764,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                             sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                                     .append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
                     sBuffer.delete(0, sBuffer.length());
-                    clientMetrics.bookSuccGetMsg(dltTime, msgCount, msgSize);
+                    clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
                     break;
                 }
                 case TErrCodeConstants.HB_NO_NODE:
@@ -824,11 +825,11 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 }
             }
             if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
-                clientMetrics.bookFailRpcCall(dltTime, msgRspB2C.getErrCode());
+                clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
             }
             return taskContext;
         } catch (Throwable ee) {
-            clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
             logger.error("Process response code error", ee);
             clientRmtDataCache.succRspRelease(partitionKey, topic,
                     taskContext.getUsedToken(), false, isFilterConsume(topic),
@@ -968,14 +969,14 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 clientRmtDataCache.resumeTimeoutConsumePartitions(false,
                         consumerConfig.getPullProtectConfirmTimeoutMs());
                 // print metric information
-                clientMetrics.printMetricInfo(false, strBuffer);
+                clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
                 // Send heartbeat request to master
                 ClientMaster.HeartResponseM2CV2 response =
                         masterService.consumerHeartbeatC2MV2(createMasterHeartBeatRequest(),
                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                 // Process unsuccessful response
                 if (response == null) {
-                    clientMetrics.bookHB2MasterTimeout();
+                    clientStatsInfo.bookHB2MasterTimeout();
                     logger.warn(strBuffer.append("[Heartbeat Failed] ")
                             .append("return result is null!").toString());
                     strBuffer.delete(0, strBuffer.length());
@@ -985,7 +986,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
                     // If master replies that cannot find current consumer node, re-register
                     if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
-                        clientMetrics.bookHB2MasterTimeout();
+                        clientStatsInfo.bookHB2MasterTimeout();
                         if (tryRegister2Master(result, strBuffer)) {
                             logger.info(strBuffer.append("[Re-register] ")
                                     .append(consumerId).toString());
@@ -995,7 +996,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                         }
                         return;
                     }
-                    clientMetrics.bookHB2MasterException();
+                    clientStatsInfo.bookHB2MasterException();
                     logger.error(strBuffer.append("[Heartbeat Failed] ")
                             .append(response.getErrMsg()).toString());
                     if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
@@ -1065,14 +1066,14 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                             createBrokerRegisterRequest(partition, boostrapOffset),
                             AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
             if (response == null) {
-                clientMetrics.bookReg2Broker(true);
+                clientStatsInfo.bookReg2Broker(true);
                 result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
                         sBuffer.append(" register ").append(partition.toString())
                                 .append(" return null!").toString());
                 return result.isSuccess();
             }
             if (response.getSuccess()) {
-                clientMetrics.bookReg2Broker(false);
+                clientStatsInfo.bookReg2Broker(false);
                 long currOffset = response.hasCurrOffset()
                         ? response.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
                 long maxOffset = response.hasMaxOffset()
@@ -1086,7 +1087,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 result.setSuccResult();
                 return result.isSuccess();
             } else {
-                clientMetrics.bookReg2Broker(true);
+                clientStatsInfo.bookReg2Broker(true);
                 if (response.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
                         || response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
                     clientRmtDataCache.removePartition(partition);
@@ -1125,7 +1126,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                     masterService.consumerRegisterC2MV2(createMasterRegisterRequest(),
                             AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
             if (response == null) {
-                clientMetrics.bookReg2Master(true);
+                clientStatsInfo.bookReg2Master(true);
                 result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
                         sBuffer.append("Register Failed: ").append(consumerId)
                                 .append(" register to master return null!").toString());
@@ -1133,7 +1134,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 return result.isSuccess();
             }
             if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
-                clientMetrics.bookReg2Master(true);
+                clientStatsInfo.bookReg2Master(true);
                 // If the consumer group is forbidden, output the log
                 if (response.getErrCode()
                         == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
@@ -1149,7 +1150,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 sBuffer.delete(0, sBuffer.length());
                 return result.isSuccess();
             }
-            clientMetrics.bookReg2Master(false);
+            clientStatsInfo.bookReg2Master(false);
             // Process the successful response
             clientRmtDataCache.updateReg2MasterTime();
             clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
@@ -1287,7 +1288,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                                         createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
                                         AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                         if (response == null) {
-                            clientMetrics.bookHB2BrokerTimeout();
+                            clientStatsInfo.bookHB2BrokerTimeout();
                             continue;
                         }
                         if (response.getSuccess()) {
@@ -1333,7 +1334,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                                 }
                             }
                         } else {
-                            clientMetrics.bookHB2BrokerException();
+                            clientStatsInfo.bookHB2BrokerException();
                             if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
                                 for (Partition partition : partitions) {
                                     clientRmtDataCache.removePartition(partition);
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
index bb3b33e..b430781 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -205,7 +205,7 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
                     .append("Not found the partition by confirmContext:")
                     .append(confirmContext).toString());
         }
-        baseConsumer.clientMetrics.bookConfirmDuration(
+        baseConsumer.clientStatsInfo.bookConfirmDuration(
                 System.currentTimeMillis() - timeStamp);
         if (this.baseConsumer.consumerConfig.isPullConfirmInLocal()) {
             baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
index 2fa49fd..45f5b86 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
@@ -249,7 +249,7 @@ public class SimplePushMessageConsumer implements PushMessageConsumer {
         } else {
             this.receiveMessages(request, topicProcessor);
         }
-        baseConsumer.clientMetrics.bookConfirmDuration(
+        baseConsumer.clientStatsInfo.bookConfirmDuration(
                 System.currentTimeMillis() - request.getUsedToken());
         return true;
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
index fd519a1..34e0576 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
@@ -37,7 +37,4 @@ public interface MessageProducer extends Shutdownable {
 
     void sendMessage(Message message, MessageSentCallback cb)
             throws TubeClientException, InterruptedException;
-
-    @Override
-    void shutdown() throws Throwable;
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 3a1fff3..4ad2a29 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
 import org.apache.inlong.tubemq.client.common.TubeClientVersion;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -94,14 +94,15 @@ public class ProducerManager {
     private long lastEmptyBrokerPrintTime = 0;
     private long lastEmptyTopicPrintTime = 0;
     private int heartbeatRetryTimes = 0;
-    private AtomicBoolean isStartHeart = new AtomicBoolean(false);
-    private AtomicInteger heartBeatStatus = new AtomicInteger(-1);
+    private final AtomicBoolean isStartHeart = new AtomicBoolean(false);
+    private final AtomicInteger heartBeatStatus = new AtomicInteger(-1);
     private volatile long lastHeartbeatTime = System.currentTimeMillis();
-    private AtomicInteger nodeStatus = new AtomicInteger(-1);
+    private final AtomicInteger nodeStatus = new AtomicInteger(-1);
     private Map<String, Map<Integer, List<Partition>>> topicPartitionMap =
             new ConcurrentHashMap<>();
-    private AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
-    private final ClientMetrics clientMetrics;
+    private final AtomicBoolean nextWithAuthInfo2M =
+            new AtomicBoolean(false);
+    private final ClientStatsInfo clientStatsInfo;
 
     public ProducerManager(final InnerSessionFactory sessionFactory,
                            final TubeClientConfig tubeClientConfig) throws TubeClientException {
@@ -132,9 +133,12 @@ public class ProducerManager {
         rpcConfig.put(RpcConstants.WORKER_THREAD_NAME, "tube_netty_worker-");
         rpcConfig.put(RpcConstants.CALLBACK_WORKER_COUNT,
                 tubeClientConfig.getRpcRspCallBackThreadCnt());
-        // initial client metrics
-        this.clientMetrics =
-                new ClientMetrics(true, this.producerId, this.tubeClientConfig);
+        // initial client statistics configure
+        this.clientStatsInfo =
+                new ClientStatsInfo(true, this.producerId,
+                        this.tubeClientConfig.enableStatsSelfPrint(),
+                        this.tubeClientConfig.getStatsSelfPrintPeriodMs(),
+                        this.tubeClientConfig.getStatsForcedResetPeriodMs());
         heartBeatStatus.set(0);
         this.masterService =
                 this.rpcServiceFactory.getFailoverService(MasterService.class,
@@ -292,7 +296,7 @@ public class ProducerManager {
             }
             return;
         }
-        clientMetrics.printMetricInfo(true, strBuff);
+        clientStatsInfo.selfPrintStatsInfo(true, strBuff);
         if (this.nodeStatus.compareAndSet(0, 1)) {
             this.heartbeatService.shutdownNow();
             this.topicPartitionMap.clear();
@@ -307,8 +311,8 @@ public class ProducerManager {
      *
      * @return client metrics
      */
-    public ClientMetrics getClientMetrics() {
-        return clientMetrics;
+    public ClientStatsInfo getClientMetrics() {
+        return clientStatsInfo;
     }
 
     /**
@@ -420,18 +424,18 @@ public class ProducerManager {
                         this.masterService.producerRegisterP2M(createRegisterRequest(),
                                 AddressUtils.getLocalAddress(), tubeClientConfig.isTlsEnable());
                 if (response == null) {
-                    clientMetrics.bookReg2Master(true);
+                    clientStatsInfo.bookReg2Master(true);
                 } else {
                     if (response.getSuccess()) {
                         if (response.getBrokerCheckSum() != this.brokerInfoCheckSum) {
                             updateBrokerInfoList(true, response.getBrokerInfosList(),
                                     response.getBrokerCheckSum(), sBuilder);
                         }
-                        clientMetrics.bookReg2Master(false);
+                        clientStatsInfo.bookReg2Master(false);
                         processRegSyncInfo(response);
                         return;
                     } else {
-                        clientMetrics.bookReg2Master(true);
+                        clientStatsInfo.bookReg2Master(true);
                     }
                 }
                 if (remainingRetry <= 0) {
@@ -665,7 +669,7 @@ public class ProducerManager {
                 ThreadUtils.sleep(100);
             }
             // print metrics information
-            clientMetrics.printMetricInfo(false, sBuilder);
+            clientStatsInfo.selfPrintStatsInfo(false, sBuilder);
             // check whether public topics
             if (publishTopics.isEmpty()) {
                 return;
@@ -677,14 +681,14 @@ public class ProducerManager {
                 if (response == null || !response.getSuccess()) {
                     heartbeatRetryTimes++;
                     if (response == null) {
-                        clientMetrics.bookHB2MasterException();
+                        clientStatsInfo.bookHB2MasterException();
                         logger.error("[Heartbeat Failed] receive null HeartResponseM2P response!");
                     } else {
                         logger.error(sBuilder.append("[Heartbeat Failed] ")
                                 .append(response.getErrMsg()).toString());
                         sBuilder.delete(0, sBuilder.length());
                         if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
-                            clientMetrics.bookHB2MasterTimeout();
+                            clientStatsInfo.bookHB2MasterTimeout();
                             try {
                                 register2Master();
                             } catch (Throwable ee) {
@@ -694,7 +698,7 @@ public class ProducerManager {
                                 sBuilder.delete(0, sBuilder.length());
                             }
                         } else {
-                            clientMetrics.bookHB2MasterException();
+                            clientStatsInfo.bookHB2MasterException();
                             if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
                                 adjustHeartBeatPeriod("certificate failure", sBuilder);
                             }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 4e35b5d..c4699fd 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -63,7 +63,7 @@ public class SimpleMessageProducer implements MessageProducer {
     private final PartitionRouter partitionRouter;
     private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
     private final RpcConfig rpcConfig = new RpcConfig();
-    private AtomicBoolean isShutDown = new AtomicBoolean(false);
+    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
     public SimpleMessageProducer(final InnerSessionFactory sessionFactory,
                                  TubeClientConfig tubeClientConfig) throws TubeClientException {
@@ -223,7 +223,6 @@ public class SimpleMessageProducer implements MessageProducer {
                 rpcServiceFactory.addRmtAddrErrCount(partition.getBroker().getBrokerAddr());
             }
             producerManager.getClientMetrics().bookFailRpcCall(
-                    System.currentTimeMillis() - startTime,
                     TErrCodeConstants.UNSPECIFIED_ABNORMAL);
             partition.increRetries(1);
             this.brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
@@ -269,7 +268,6 @@ public class SimpleMessageProducer implements MessageProducer {
                         @Override
                         public void handleError(Throwable error) {
                             producerManager.getClientMetrics().bookFailRpcCall(
-                                    System.currentTimeMillis() - startTime,
                                     TErrCodeConstants.UNSPECIFIED_ABNORMAL);
                             partition.increRetries(1);
                             brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
@@ -365,7 +363,8 @@ public class SimpleMessageProducer implements MessageProducer {
                                                  final ClientBroker.SendMessageResponseB2P response) {
         final String resultStr = response.getErrMsg();
         if (response.getErrCode() == TErrCodeConstants.SUCCESS) {
-            producerManager.getClientMetrics().bookSuccSendMsg(dltTime, message.getData().length);
+            producerManager.getClientMetrics().bookSuccSendMsg(dltTime,
+                    message.getTopic(), message.getData().length);
             if (response.hasMessageId()) {
                 return new MessageSentResult(true,
                         response.getErrCode(), "Ok!",
@@ -376,7 +375,7 @@ public class SimpleMessageProducer implements MessageProducer {
                         message, Long.parseLong(resultStr), partition);
             }
         } else {
-            producerManager.getClientMetrics().bookFailRpcCall(dltTime, response.getErrCode());
+            producerManager.getClientMetrics().bookFailRpcCall(response.getErrCode());
             return new MessageSentResult(false, response.getErrCode(), resultStr,
                     message, TBaseConstants.META_VALUE_UNDEFINED, partition);
         }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index 92070f3..2835a2e 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -82,5 +82,6 @@ public class TBaseConstants {
 
     public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
     public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;
+    public static final long CFG_STATS_MIN_SNAPSHOT_PERIOD_MS = 2000;
 
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
new file mode 100644
index 0000000..996592a
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+
+/**
+ * TrafficStatsUnit, Metric Statistics item Unit
+ *
+ * Currently includes the total number of messages and bytes
+ * according to the statistics dimension, which can be expanded later as needed
+ */
+public class TrafficStatsUnit {
+    // the message count
+    public LongStatsCounter msgCnt;
+    // the message size
+    public LongStatsCounter msgSize;
+
+    /**
+     * Accumulate the count of messages and message bytes.
+     *
+     * @param msgCntName    the specified count statistics item name
+     * @param msgSizeName   the specified size statistics item name
+     * @param prefix        the prefix of statistics items
+     */
+    public TrafficStatsUnit(String msgCntName, String msgSizeName, String prefix) {
+        this.msgCnt = new LongStatsCounter(msgCntName, prefix);
+        this.msgSize = new LongStatsCounter(msgSizeName, prefix);
+    }
+
+    /**
+     * Accumulate the count of messages and message bytes.
+     *
+     * @param msgCount  the specified message count
+     * @param msgSize   the specified message size
+     */
+    public void addMsgCntAndSize(long msgCount, long msgSize) {
+        this.msgCnt.addValue(msgCount);
+        this.msgSize.addValue(msgSize);
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
index 4b9f793..7305bf6 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
@@ -134,6 +134,15 @@ public class TStringUtils {
         return new String(tgtStr, 0, curWritePos);
     }
 
+    /**
+     * Get the authorization signature based on the provided values
+     * base64.encode(hmacSha1(password, username, timestamp, random number))
+     *
+     * @param usrName       the user name
+     * @param usrPassWord   the password of username
+     * @param timestamp     the time stamp
+     * @param randomValue   the random value
+     */
     public static String getAuthSignature(final String usrName,
                                           final String usrPassWord,
                                           long timestamp, int randomValue) {
@@ -155,6 +164,14 @@ public class TStringUtils {
         return signature;
     }
 
+    /**
+     * Build attribute information
+     *
+     * @param srcAttrs   the current attribute
+     * @param attrKey    the attribute key
+     * @param attrVal    the attribute value
+     * @return           the new attribute information
+     */
     public static String setAttrValToAttributes(String srcAttrs,
                                                 String attrKey, String attrVal) {
         StringBuilder sbuf = new StringBuilder(512);
@@ -184,6 +201,13 @@ public class TStringUtils {
         return sbuf.toString();
     }
 
+    /**
+     * Get attribute value by key from attribute information
+     *
+     * @param srcAttrs   the current attribute
+     * @param attrKey    the attribute key
+     * @return           the attribute value
+     */
     public static String getAttrValFrmAttributes(String srcAttrs, String attrKey) {
         if (!isBlank(srcAttrs)) {
             String[] strAttrs = srcAttrs.split(TokenConstants.SEGMENT_SEP);
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
index 2abccd5..ebaf7d9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
@@ -99,6 +99,8 @@ public class ThreadUtils {
     }
 
     /**
+     * Check and wait the specified thread exit
+     *
      * @param t Waits on the passed thread to die dumping a threaddump every minute while its up.
      */
     public static void threadDumpingIsAlive(final Thread t) throws InterruptedException {
@@ -115,6 +117,8 @@ public class ThreadUtils {
     }
 
     /**
+     * Sleep current thread
+     *
      * @param millis How long to sleep for in milliseconds.
      */
     public static void sleep(long millis) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 2b8aba3..a419b02 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -806,7 +806,6 @@ public class MessageStore implements Closeable {
             if (tmpStore.getMaxAllowedMsgCount() == writeCacheMaxCnt
                     && tmpStore.getMaxDataCacheSize() == writeCacheMaxSize) {
                 msgMemStore = tmpStore;
-                msgMemStore.clear();
             } else {
                 isRealloc = true;
                 tmpStore.close();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 77ab572..7c6c20a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -188,8 +188,10 @@ public class MsgFileStore implements Closeable {
             long currTime = System.currentTimeMillis();
             isMsgDataFlushed = (messageStore.getUnflushDataHold() > 0)
                     && (curUnflushSize.get() >= messageStore.getUnflushDataHold());
-            if ((isMsgCntFlushed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold())
-                || (isMsgTimeFlushed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval())
+            if ((isMsgCntFlushed =
+                    (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold()))
+                || (isMsgTimeFlushed =
+                    (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval()))
                 || isMsgDataFlushed || isDataSegFlushed || isIndexSegFlushed) {
                 isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
                     || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 81a0f9b..678c72c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -139,9 +139,7 @@ public class MsgMemStore implements Closeable {
             dataOffset = this.writeDataStartPos + this.cacheDataOffset.get();
             indexEntry.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, dataOffset);
             dataEntry.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, indexOffset);
-            // this.cacheDataSegment.position(this.cacheDataOffset.get());
             this.cacheDataSegment.put(dataEntry.array());
-            // this.cachedIndexSegment.position(this.cacheIndexOffset.get());
             this.cachedIndexSegment.put(indexEntry.array());
             this.cacheDataOffset.getAndAdd(dataEntryLength);
             indexSizePos = cacheIndexOffset.getAndAdd(DataStoreUtils.STORE_INDEX_HEAD_LEN);
@@ -392,6 +390,8 @@ public class MsgMemStore implements Closeable {
         this.curMessageCount.set(0);
         this.queuesMap.clear();
         this.keysMap.clear();
+        this.cacheDataSegment.rewind();
+        this.cachedIndexSegment.rewind();
         this.leftAppendTime.set(System.currentTimeMillis());
         this.rightAppendTime.set(System.currentTimeMillis());
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 11dd0ec..07817aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -20,11 +20,11 @@ package org.apache.inlong.tubemq.server.broker.stats;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
 import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
 
 /**
  * BrokerSrvStatsHolder, statistic Broker metrics information for RPC services
@@ -142,7 +142,7 @@ public class BrokerSrvStatsHolder {
         long curSnapshotTime = lstSnapshotTime.get();
         // Avoid frequent snapshots
         if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+                >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
             if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
                 switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
                 return true;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index ff88df7..c2d3ad3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -20,6 +20,8 @@ package org.apache.inlong.tubemq.server.broker.stats;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
 import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
@@ -352,7 +354,7 @@ public class MsgStoreStatsHolder {
         long curSwitchTime = lstSnapshotTime.get();
         // Avoid frequent snapshots
         if ((System.currentTimeMillis() - curSwitchTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+                >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
             if (lstSnapshotTime.compareAndSet(curSwitchTime, System.currentTimeMillis())) {
                 msgStoreStatsSets[getIndex(writableIndex.get() - 1)].clear();
                 msgStoreStatsSets[getIndex(writableIndex.getAndIncrement())]
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index 0aa8ea2..f27147c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -22,8 +22,8 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
+import org.apache.inlong.tubemq.corebase.metric.TrafficStatsUnit;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,18 +96,18 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
 
     @Override
     public void add(Map<String, TrafficInfo> trafficInfos) {
-        TrafficStatsSet tmpStatsSet;
-        TrafficStatsSet trafficStatsSet;
+        TrafficStatsUnit tmpStatsSet;
+        TrafficStatsUnit trafficStatsSet;
         // Increment write reference count
         switchableUnits[getIndex()].refCnt.incValue();
         try {
             // Accumulate statistics information
-            ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+            ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
                     switchableUnits[getIndex()].statsUnitMap;
             for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
                 trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
                 if (trafficStatsSet == null) {
-                    tmpStatsSet = new TrafficStatsSet();
+                    tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
                     trafficStatsSet = tmpStatsSetMap.putIfAbsent(entry.getKey(), tmpStatsSet);
                     if (trafficStatsSet == null) {
                         trafficStatsSet = tmpStatsSet;
@@ -128,11 +128,11 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
         switchableUnits[getIndex()].refCnt.incValue();
         try {
             // Accumulate statistics information
-            ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+            ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
                     switchableUnits[getIndex()].statsUnitMap;
-            TrafficStatsSet trafficStatsSet = tmpStatsSetMap.get(statsKey);
+            TrafficStatsUnit trafficStatsSet = tmpStatsSetMap.get(statsKey);
             if (trafficStatsSet == null) {
-                TrafficStatsSet tmpStatsSet = new TrafficStatsSet();
+                TrafficStatsUnit tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
                 trafficStatsSet = tmpStatsSetMap.putIfAbsent(statsKey, tmpStatsSet);
                 if (trafficStatsSet == null) {
                     trafficStatsSet = tmpStatsSet;
@@ -169,8 +169,8 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
             }
         } while (selectedUnit.refCnt.getValue() > 0);
         // Output data to file
-        Map<String, TrafficStatsSet> statsMap = selectedUnit.statsUnitMap;
-        for (Entry<String, TrafficStatsSet> entry : statsMap.entrySet()) {
+        Map<String, TrafficStatsUnit> statsMap = selectedUnit.statsUnitMap;
+        for (Entry<String, TrafficStatsUnit> entry : statsMap.entrySet()) {
             logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
                     entry.getValue().msgCnt.getAndResetValue(),
                     entry.getValue().msgSize.getAndResetValue());
@@ -198,34 +198,6 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
     }
 
     /**
-     * StatsItemSet, Metric Statistics item set
-     *
-     * Currently includes the total number of messages and bytes
-     * according to the statistics dimension, which can be expanded later as needed
-     */
-    private static class TrafficStatsSet {
-        protected LongStatsCounter msgCnt =
-                new LongStatsCounter("msg_count", null);
-        protected LongStatsCounter msgSize =
-                new LongStatsCounter("msg_size", null);
-
-        public TrafficStatsSet() {
-            //
-        }
-
-        /**
-         * Accumulate the count of messages and message bytes.
-         *
-         * @param msgCount  the specified message count
-         * @param msgSize   the specified message size
-         */
-        public void addMsgCntAndSize(long msgCount, long msgSize) {
-            this.msgCnt.addValue(msgCount);
-            this.msgSize.addValue(msgSize);
-        }
-    }
-
-    /**
      * WritableUnit,
      *
      * This class is mainly defined to facilitate reading and writing of
@@ -237,7 +209,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
         public LongOnlineCounter refCnt =
                 new LongOnlineCounter("ref_count", null);
         // statistic unit map
-        protected ConcurrentHashMap<String, TrafficStatsSet> statsUnitMap =
+        protected ConcurrentHashMap<String, TrafficStatsUnit> statsUnitMap =
                 new ConcurrentHashMap<>(512);
     }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index a336ae8..007a7fb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -99,7 +99,6 @@ public final class TServerConstants {
     public static final long CFG_OFFSET_RESET_MID_ALARM_CHECK =
             DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
 
-    // Minimum snapshot period
-    public static final long MIN_SNAPSHOT_PERIOD_MS = 2000L;
+    // max statistics token type length
     public static final int META_MAX_STATSTYPE_LENGTH = 256;
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index f47beb7..9360f40 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -21,10 +21,10 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
 import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
 import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
 
 /**
  * WebCallStatsHolder, statistic for web api calls
@@ -118,7 +118,7 @@ public class WebCallStatsHolder {
         long curSnapshotTime = lstSnapshotTime.get();
         // Avoid frequent snapshots
         if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+                >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
             if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
                 switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
                 return true;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
index 082640c..a60483e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
@@ -20,11 +20,11 @@ package org.apache.inlong.tubemq.server.master.stats;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
 import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
 import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
 
 /**
  * MasterSrvStatsHolder, statistics Master's RPC service metric information
@@ -217,7 +217,7 @@ public class MasterSrvStatsHolder {
         long curSnapshotTime = lstSnapshotTime.get();
         // Avoid frequent snapshots
         if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+                >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
             if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
                 switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
                 return true;