You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/25 14:44:45 UTC

[incubator-inlong] branch master updated: [INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fdf46c6  [INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)
fdf46c6 is described below

commit fdf46c67c0db289b41a1fef54dac5f1b6dc834a8
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Feb 25 22:44:39 2022 +0800

    [INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)
---
 .../tubemq/client/common/ClientStatsInfo.java      | 579 +++++++++++++--------
 .../inlong/tubemq/client/common/StatsConfig.java   | 161 ++++++
 .../{StatsOutputLevel.java => StatsLevel.java}     |  15 +-
 .../tubemq/client/common/TClientConstants.java     |  12 +-
 .../tubemq/client/config/TubeClientConfig.java     | 111 ++--
 .../client/consumer/BaseMessageConsumer.java       |  13 +-
 .../tubemq/client/consumer/FetchContext.java       |   4 +
 .../consumer/SimpleClientBalanceConsumer.java      |  20 +-
 .../client/consumer/SimplePullMessageConsumer.java |  10 +-
 .../client/consumer/SimplePushMessageConsumer.java |   2 +-
 .../tubemq/client/producer/ProducerManager.java    |   8 +-
 .../client/producer/SimpleMessageProducer.java     |   2 +-
 .../tubemq/client/consumer/StatsConfigTest.java    |  51 ++
 .../tubemq/corebase/metric/TrafficStatsUnit.java   |  23 +
 .../server/broker/stats/BrokerSrvStatsHolder.java  |   4 +-
 .../server/broker/stats/MsgStoreStatsHolder.java   |  12 +-
 .../server/broker/stats/TrafficStatsService.java   |  20 +-
 .../server/common/webbase/WebCallStatsHolder.java  |  70 ++-
 .../server/master/stats/MasterSrvStatsHolder.java  |   4 +-
 19 files changed, 755 insertions(+), 366 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
index 186da25..548b3e2 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -37,30 +37,20 @@ public class ClientStatsInfo {
             LoggerFactory.getLogger(ClientStatsInfo.class);
     private final boolean isProducer;
     private final String logPrefix;
+    // statistic configure
+    private final StatsConfig statsConfig = new StatsConfig();
     // switchable statistic items
     private final ClientStatsItemSet[] switchableSets = new ClientStatsItemSet[2];
     // current writable index
     private final AtomicInteger writableIndex = new AtomicInteger(0);
-    // statistics self-print period, ms
-    private final long statsPrintPeriodMs;
-    // statistics force reset period, ms
-    private final long statsForcedResetPeriodMs;
-    // whether the statistic is closed
-    private volatile boolean isClosed = false;
-    // whether to self-print statistics
-    private volatile boolean isSelfPrint = true;
     // last self-print time
     private final AtomicLong lstSelfPrintTime = new AtomicLong(0);
     // last snapshot time
     private final AtomicLong lstSnapshotTime = new AtomicLong(0);
 
-    public ClientStatsInfo(boolean isProducer, String clientId,
-                           boolean isSelfPrint, long statsPrintPeriodMs,
-                           long statsForcedResetPeriodMs) {
+    public ClientStatsInfo(boolean isProducer, String clientId, StatsConfig statsConfig) {
         this.isProducer = isProducer;
-        this.isSelfPrint = isSelfPrint;
-        this.statsPrintPeriodMs = statsPrintPeriodMs;
-        this.statsForcedResetPeriodMs = statsForcedResetPeriodMs;
+        this.statsConfig.updateStatsConfig(statsConfig);
         StringBuilder strBuff = new StringBuilder(512);
         if (isProducer) {
             strBuff.append("[Producer");
@@ -73,20 +63,12 @@ public class ClientStatsInfo {
         this.switchableSets[1] = new ClientStatsItemSet();
     }
 
-    public synchronized void setStatsStatus(boolean enableStats) {
-        this.isClosed = !enableStats;
-    }
-
-    public boolean isStatsClosed() {
-        return this.isClosed;
-    }
-
-    public boolean isStatsSelfPrint() {
-        return this.isSelfPrint;
+    public void updStatsControlInfo(StatsLevel statsLevel, boolean enableSelfPrint) {
+        this.statsConfig.updateStatsControl(statsLevel, enableSelfPrint);
     }
 
     public void bookReg2Master(boolean isFailure) {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].regMasterCnt.incValue();
@@ -96,21 +78,21 @@ public class ClientStatsInfo {
     }
 
     public void bookHB2MasterTimeout() {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].regMasterTimoutCnt.incValue();
     }
 
     public void bookHB2MasterException() {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].hbMasterExcCnt.incValue();
     }
 
     public void bookReg2Broker(boolean isFailure) {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].regBrokerCnt.incValue();
@@ -120,82 +102,99 @@ public class ClientStatsInfo {
     }
 
     public void bookHB2BrokerTimeout() {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].regBrokerTimoutCnt.incValue();
     }
 
     public void bookHB2BrokerException() {
-        if (isClosed) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
         switchableSets[getIndex()].hbBrokerExcCnt.incValue();
     }
 
-    public void bookConfirmDuration(long dltTime) {
-        if (isClosed) {
+    public void bookReturnDuration(String partitionKey, long dltTime) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
+            return;
+        }
+        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+        curItemSet.csmLatencyStats.update(dltTime);
+        if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+            curItemSet.addCsmLatencyByPartKey(partitionKey, dltTime);
+        }
+    }
+
+    public void bookConfirmDuration(String partitionKey, long dltTime) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
-        switchableSets[getIndex()].csmLatencyStats.update(dltTime);
+        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+        curItemSet.confirmDltStats.update(dltTime);
+        if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+            curItemSet.addConfirmDltByPartKey(partitionKey, dltTime);
+        }
     }
 
-    public void bookSuccSendMsg(long dltTime, String topicName, int msgSize) {
-        if (isClosed) {
+    public void bookFailRpcCall(int errCode) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
-        sendOrRecvMsg(topicName, dltTime, 1, msgSize);
-        bookFailRpcCall(TErrCodeConstants.SUCCESS);
+        switchableSets[getIndex()].bookFailRpcCall(errCode);
     }
 
-    public void bookSuccGetMsg(long dltTime, String topicName, int msgCnt, int msgSize) {
-        if (isClosed) {
+    public void bookSuccSendMsg(long dltTime, String topicName,
+                                String partitionKey, int msgSize) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
-        sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
-        bookFailRpcCall(TErrCodeConstants.SUCCESS);
+        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+        curItemSet.sendOrRecvMsg(topicName, dltTime, 1, msgSize);
+        curItemSet.bookFailRpcCall(TErrCodeConstants.SUCCESS);
+        if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+            curItemSet.addTrafficInfoByPartKey(partitionKey, dltTime, 1, msgSize);
+        }
     }
 
-    public void bookFailRpcCall(int errCode) {
-        if (isClosed) {
+    public void bookSuccGetMsg(long dltTime, String topicName,
+                               String partitionKey, int msgCnt, int msgSize) {
+        if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
             return;
         }
-        // accumulate msg count by errcode
         ClientStatsItemSet curItemSet = switchableSets[getIndex()];
-        LongStatsCounter curItemCounter = curItemSet.errRspStatsMap.get(errCode);
-        if (curItemCounter == null) {
-            LongStatsCounter tmpCounter = new LongStatsCounter(
-                    "err_" + errCode, "");
-            curItemCounter = curItemSet.errRspStatsMap.putIfAbsent(errCode, tmpCounter);
-            if (curItemCounter == null) {
-                curItemCounter = tmpCounter;
-            }
+        curItemSet.sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
+        curItemSet.bookFailRpcCall(TErrCodeConstants.SUCCESS);
+        if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+            curItemSet.addTrafficInfoByPartKey(partitionKey, dltTime, msgCnt, msgSize);
         }
-        curItemCounter.incValue();
     }
 
     /**
      * Self print statistics information to log file
      *
      * @param forcePrint    whether force print statistics information
+     * @param needReset     whether reset statistics set
      * @param strBuff       string buffer
      */
-    public void selfPrintStatsInfo(boolean forcePrint, StringBuilder strBuff) {
-        if (this.isClosed || !this.isSelfPrint) {
+    public void selfPrintStatsInfo(boolean forcePrint,
+                                   boolean needReset,
+                                   StringBuilder strBuff) {
+        if ((this.statsConfig.getStatsLevel() == StatsLevel.ZERO)
+                || !this.statsConfig.isEnableSelfPrint()) {
             return;
         }
         long lstPrintTime = lstSelfPrintTime.get();
         long curChkTime = Clock.systemDefaultZone().millis();
-        if (forcePrint || (curChkTime - lstPrintTime > this.statsPrintPeriodMs)) {
+        if (forcePrint || (curChkTime - lstPrintTime > this.statsConfig.getSelfPrintPeriodMs())) {
             if (lstSelfPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
-                if (switchWritingStatsUnit(false)) {
-                    strBuff.append(this.logPrefix).append(", reset value=");
+                if (switchWritingStatsUnit(needReset)) {
+                    strBuff.append(this.logPrefix).append(", reset value= ");
                     getStatsInfo(switchableSets[getIndex(writableIndex.get() - 1)],
-                            strBuff, StatsOutputLevel.FULL, false);
+                            true, strBuff);
                 } else {
-                    strBuff.append(this.logPrefix).append(", value=");
-                    getStatsInfo(switchableSets[getIndex()],
-                            strBuff, StatsOutputLevel.FULL, false);
+                    strBuff.append(this.logPrefix).append(", value= ");
+                    getStatsInfo(switchableSets[getIndex()], false, strBuff);
                 }
                 logger.info(strBuff.toString());
                 strBuff.delete(0, strBuff.length());
@@ -208,9 +207,9 @@ public class ClientStatsInfo {
         long lstResetTime = lstSnapshotTime.get();
         long checkDltTime = System.currentTimeMillis() - lstResetTime;
         if (((needReset && (checkDltTime > TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS))
-                || (checkDltTime > this.statsForcedResetPeriodMs))
+                || (checkDltTime > this.statsConfig.getForcedResetPeriodMs()))
                 && lstSnapshotTime.compareAndSet(lstResetTime, System.currentTimeMillis())) {
-            switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+            switchableSets[getIndex(writableIndex.incrementAndGet())].resetStartTime();
             return true;
         }
         return false;
@@ -219,167 +218,49 @@ public class ClientStatsInfo {
     /**
      * Get current data encapsulated by Json format
      *
-     * @param strBuff      string buffer
-     * @param outputLevel  the output level of statistics
+     * @param statsSet     the statistics to be printed
      * @param resetValue   whether to reset the current data
+     * @param strBuff      string buffer
      */
-    private void getStatsInfo(ClientStatsItemSet statsSet, StringBuilder strBuff,
-                              StatsOutputLevel outputLevel, boolean resetValue) {
-        int totalCnt = 0;
+    private void getStatsInfo(ClientStatsItemSet statsSet,
+                              boolean resetValue, StringBuilder strBuff) {
         strBuff.append("{\"").append(statsSet.resetTime.getFullName())
                 .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
                 .append("\",\"probe_time\":\"")
                 .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
-                .append("\"");
-        if (resetValue) {
-            strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
-                    .append("\":").append(statsSet.totalTrafficStats.msgCnt.getAndResetValue())
-                    .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
-                    .append("\":").append(statsSet.totalTrafficStats.msgSize.getAndResetValue())
-                    .append(",\"traffic_details\":{");
-            for (Map.Entry<String, TrafficStatsUnit> entry
-                    : statsSet.topicTrafficMap.entrySet()) {
-                if (entry == null) {
-                    continue;
-                }
-                if (totalCnt++ > 0) {
-                    strBuff.append(",");
-                }
-                strBuff.append("\"").append(entry.getKey()).append("\":{\"")
-                        .append(entry.getValue().msgCnt.getShortName()).append("\":")
-                         .append(entry.getValue().msgCnt.getAndResetValue()).append(",\"")
-                        .append(entry.getValue().msgSize.getShortName()).append("\":")
-                        .append(entry.getValue().msgCnt.getAndResetValue()).append("}");
-            }
-            strBuff.append("}");
-            if (outputLevel != StatsOutputLevel.SIMPLEST) {
-                strBuff.append(",");
+                .append("\",");
+        statsSet.totalTrafficStats.getValue(strBuff, resetValue);
+        strBuff.append(",");
+        statsSet.getTopicDetailInfo(strBuff, resetValue);
+        if (this.statsConfig.getStatsLevel() != StatsLevel.SIMPLEST) {
+            strBuff.append(",");
+            if (resetValue) {
                 statsSet.msgCallDltStats.snapShort(strBuff, false);
                 if (!isProducer) {
                     strBuff.append(",");
                     statsSet.csmLatencyStats.snapShort(strBuff, false);
-                }
-                strBuff.append(",\"rsp_details\":{");
-                totalCnt = 0;
-                for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
-                    if (statsCounter == null) {
-                        continue;
-                    }
-                    if (totalCnt++ > 0) {
-                        strBuff.append(",");
-                    }
-                    strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
-                            .append(statsCounter.getAndResetValue());
-                }
-                strBuff.append("}");
-            }
-            if (outputLevel == StatsOutputLevel.FULL) {
-                strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterFailCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterTimoutCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
-                        .append("\":").append(statsSet.hbMasterExcCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.regBrokerCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerFailCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerTimoutCnt.getAndResetValue())
-                        .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
-                        .append("\":").append(statsSet.hbBrokerExcCnt.getAndResetValue());
-            }
-            strBuff.append("}");
-        } else {
-            strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
-                    .append("\":").append(statsSet.totalTrafficStats.msgCnt.getValue())
-                    .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
-                    .append("\":").append(statsSet.totalTrafficStats.msgSize.getValue())
-                    .append(",\"traffic_details\":{");
-            for (Map.Entry<String, TrafficStatsUnit> entry
-                    : statsSet.topicTrafficMap.entrySet()) {
-                if (entry == null) {
-                    continue;
-                }
-                if (totalCnt++ > 0) {
                     strBuff.append(",");
+                    statsSet.confirmDltStats.getValue(strBuff, false);
                 }
-                strBuff.append("\"").append(entry.getKey()).append("\":{\"")
-                        .append(entry.getValue().msgCnt.getShortName()).append("\":")
-                        .append(entry.getValue().msgCnt.getValue()).append(",\"")
-                        .append(entry.getValue().msgSize.getShortName()).append("\":")
-                        .append(entry.getValue().msgCnt.getValue()).append("}");
-            }
-            strBuff.append("}");
-            if (outputLevel != StatsOutputLevel.SIMPLEST) {
-                strBuff.append(",");
+            } else {
                 statsSet.msgCallDltStats.getValue(strBuff, false);
                 if (!isProducer) {
                     strBuff.append(",");
                     statsSet.csmLatencyStats.getValue(strBuff, false);
+                    strBuff.append(",");
+                    statsSet.confirmDltStats.getValue(strBuff, false);
                 }
-                strBuff.append(",\"rsp_details\":{");
-                totalCnt = 0;
-                for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
-                    if (statsCounter == null) {
-                        continue;
-                    }
-                    if (totalCnt++ > 0) {
-                        strBuff.append(",");
-                    }
-                    strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
-                            .append(statsCounter.getValue());
-                }
-                strBuff.append("}");
             }
-            if (outputLevel == StatsOutputLevel.FULL) {
-                strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterCnt.getValue())
-                        .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterFailCnt.getValue())
-                        .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
-                        .append("\":").append(statsSet.regMasterTimoutCnt.getValue())
-                        .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
-                        .append("\":").append(statsSet.hbMasterExcCnt.getValue())
-                        .append(",\"").append(statsSet.regBrokerCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerCnt.getValue())
-                        .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerFailCnt.getValue())
-                        .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
-                        .append("\":").append(statsSet.regBrokerTimoutCnt.getValue())
-                        .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
-                        .append("\":").append(statsSet.hbBrokerExcCnt.getValue());
-            }
-            strBuff.append("}");
-        }
-    }
-
-    /**
-     * Accumulate sent or received message information
-     *
-     * @param topic     the topic name
-     * @param dltTime   the latency
-     * @param msgCnt    the message count
-     * @param msgSize   the message size
-     */
-    private void sendOrRecvMsg(String topic, long dltTime,
-                               int msgCnt, int msgSize) {
-        ClientStatsItemSet curItemSet = switchableSets[getIndex()];
-        curItemSet.msgCallDltStats.update(dltTime);
-        curItemSet.totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
-        // accumulate traffic information by topic
-        TrafficStatsUnit curStatsUnit = curItemSet.topicTrafficMap.get(topic);
-        if (curStatsUnit == null) {
-            TrafficStatsUnit tmpUnit =
-                    new TrafficStatsUnit("msg_cnt", "msg_size", topic);
-            curStatsUnit = curItemSet.topicTrafficMap.putIfAbsent(topic, tmpUnit);
-            if (curStatsUnit == null) {
-                curStatsUnit = tmpUnit;
+            strBuff.append(",");
+            statsSet.getErrorRspInfo(strBuff, resetValue);
+            strBuff.append(",");
+            statsSet.getStatusInfo(strBuff, resetValue);
+            if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+                strBuff.append(",");
+                statsSet.getPartDetailsInfo(strBuff, isProducer, resetValue);
             }
         }
-        curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+        strBuff.append("}");
     }
 
     /**
@@ -411,16 +292,22 @@ public class ClientStatsInfo {
                 new SinceTime("reset_time", null);
         // received or sent message traffic statistic
         protected final TrafficStatsUnit totalTrafficStats =
-                new TrafficStatsUnit("msg_cnt", "msg_size", "total");
+                new TrafficStatsUnit("msg_cnt", "msg_size", "total_traffic");
         // topic-based traffic statistics
         protected final ConcurrentHashMap<String, TrafficStatsUnit> topicTrafficMap =
                 new ConcurrentHashMap<>();
+        // partition-based traffic statistics
+        protected final ConcurrentHashMap<String, PartitionStatsItemSet> partDltStatsMap =
+                new ConcurrentHashMap<>();
         // time consumption statistics for sending or receiving messages
         protected final ESTHistogram msgCallDltStats =
                 new ESTHistogram("msg_call_dlt", "");
         // statistics on consumption transaction time
         protected final ESTHistogram csmLatencyStats =
                 new ESTHistogram("csm_latency_dlt", "");
+        // time consumption statistics for confirm request
+        protected final ESTHistogram confirmDltStats =
+                new ESTHistogram("msg_confirm_dlt", "");
         // error response distribution statistics
         protected final ConcurrentHashMap<Integer, LongStatsCounter> errRspStatsMap =
                 new ConcurrentHashMap<>();
@@ -444,12 +331,288 @@ public class ClientStatsInfo {
                 new LongStatsCounter("hb_broker_exception", null);
 
         public ClientStatsItemSet() {
-            resetSinceTime();
+            resetStartTime();
         }
 
-        public void resetSinceTime() {
+        public void resetStartTime() {
             this.resetTime.reset();
         }
+
+        /**
+         * Accumulate sent or received message information
+         *
+         * @param topic     the topic name
+         * @param dltTime   the latency
+         * @param msgCnt    the message count
+         * @param msgSize   the message size
+         */
+        public void sendOrRecvMsg(String topic, long dltTime,
+                                  int msgCnt, int msgSize) {
+            msgCallDltStats.update(dltTime);
+            totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
+            // accumulate traffic information by topic
+            TrafficStatsUnit curStatsUnit = topicTrafficMap.get(topic);
+            if (curStatsUnit == null) {
+                TrafficStatsUnit tmpUnit =
+                        new TrafficStatsUnit("msg_cnt", "msg_size", topic);
+                curStatsUnit = topicTrafficMap.putIfAbsent(topic, tmpUnit);
+                if (curStatsUnit == null) {
+                    curStatsUnit = tmpUnit;
+                }
+            }
+            curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+        }
+
+        /**
+         * Accumulate msg count by errcode
+         *
+         * @param errCode  the error code
+         */
+        public void bookFailRpcCall(int errCode) {
+            LongStatsCounter curItemCounter = errRspStatsMap.get(errCode);
+            if (curItemCounter == null) {
+                LongStatsCounter tmpCounter = new LongStatsCounter(
+                        "err_" + errCode, "");
+                curItemCounter = errRspStatsMap.putIfAbsent(errCode, tmpCounter);
+                if (curItemCounter == null) {
+                    curItemCounter = tmpCounter;
+                }
+            }
+            curItemCounter.incValue();
+        }
+
+        /**
+         * Accumulate traffic information by partition
+         *
+         * @param dltTime   the latency
+         * @param msgCnt    the message count
+         * @param msgSize   the message size
+         */
+        public void addTrafficInfoByPartKey(String partitionKey,
+                                            long dltTime, int msgCnt, int msgSize) {
+            PartitionStatsItemSet partStatsUnit =
+                    partDltStatsMap.get(partitionKey);
+            if (partStatsUnit == null) {
+                PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+                partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+                if (partStatsUnit == null) {
+                    partStatsUnit = tmpUnit;
+                }
+            }
+            partStatsUnit.trafficStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+            partStatsUnit.msgCallDltStats.update(dltTime);
+        }
+
+        /**
+         * Accumulate consume latency information by partition
+         *
+         * @param dltTime   the latency
+         */
+        public void addCsmLatencyByPartKey(String partitionKey, long dltTime) {
+            PartitionStatsItemSet partStatsUnit = partDltStatsMap.get(partitionKey);
+            if (partStatsUnit == null) {
+                PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+                partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+                if (partStatsUnit == null) {
+                    partStatsUnit = tmpUnit;
+                }
+            }
+            partStatsUnit.csmLatencyStats.update(dltTime);
+        }
+
+        /**
+         * Accumulate confirm delta time information by partition
+         *
+         * @param dltTime   the latency
+         */
+        public void addConfirmDltByPartKey(String partitionKey, long dltTime) {
+            PartitionStatsItemSet partStatsUnit = partDltStatsMap.get(partitionKey);
+            if (partStatsUnit == null) {
+                PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+                partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+                if (partStatsUnit == null) {
+                    partStatsUnit = tmpUnit;
+                }
+            }
+            partStatsUnit.confirmDltStats.update(dltTime);
+        }
+
+        /**
+         * Get traffic statistics details by topic
+         *
+         * @param strBuff      string buffer
+         * @param resetValue   whether to reset the current data
+         */
+        public void getTopicDetailInfo(StringBuilder strBuff, boolean resetValue) {
+            int totalCnt = 0;
+            strBuff.append("\"topic_details\":{");
+            for (Map.Entry<String, TrafficStatsUnit> entry : topicTrafficMap.entrySet()) {
+                if (entry == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                entry.getValue().getValue(strBuff, true);
+            }
+            strBuff.append("}");
+            if (resetValue) {
+                topicTrafficMap.clear();
+            }
+        }
+
+        /**
+         * Get error response statistics
+         *
+         * @param strBuff      string buffer
+         * @param resetValue   whether to reset the current data
+         */
+        public void getErrorRspInfo(StringBuilder strBuff, boolean resetValue) {
+            int totalCnt = 0;
+            strBuff.append("\"rsp_details\":{");
+            for (LongStatsCounter statsCounter : errRspStatsMap.values()) {
+                if (statsCounter == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+                        .append(statsCounter.getValue());
+            }
+            strBuff.append("}");
+            if (resetValue) {
+                errRspStatsMap.clear();
+            }
+        }
+
+        /**
+         * Get running status statistics
+         *
+         * @param strBuff      string buffer
+         * @param resetValue   whether to reset the current data
+         */
+        public void getStatusInfo(StringBuilder strBuff, boolean resetValue) {
+            strBuff.append("\"status_details\":{\"");
+            if (resetValue) {
+                strBuff.append(regMasterCnt.getFullName()).append("\":")
+                        .append(regMasterCnt.getAndResetValue())
+                        .append(",\"").append(regMasterFailCnt.getFullName()).append("\":")
+                        .append(regMasterFailCnt.getAndResetValue())
+                        .append(",\"").append(regMasterTimoutCnt.getFullName()).append("\":")
+                        .append(regMasterTimoutCnt.getAndResetValue())
+                        .append(",\"").append(hbMasterExcCnt.getFullName()).append("\":")
+                        .append(hbMasterExcCnt.getAndResetValue())
+                        .append(",\"").append(regBrokerCnt.getFullName()).append("\":")
+                        .append(regBrokerCnt.getAndResetValue())
+                        .append(",\"").append(regBrokerFailCnt.getFullName()).append("\":")
+                        .append(regBrokerFailCnt.getAndResetValue())
+                        .append(",\"").append(regBrokerTimoutCnt.getFullName()).append("\":")
+                        .append(regBrokerTimoutCnt.getAndResetValue())
+                        .append(",\"").append(hbBrokerExcCnt.getFullName()).append("\":")
+                        .append(hbBrokerExcCnt.getAndResetValue())
+                        .append("}");
+            } else {
+                strBuff.append(regMasterCnt.getFullName()).append("\":")
+                        .append(regMasterCnt.getValue())
+                        .append(",\"").append(regMasterFailCnt.getFullName()).append("\":")
+                        .append(regMasterFailCnt.getValue())
+                        .append(",\"").append(regMasterTimoutCnt.getFullName()).append("\":")
+                        .append(regMasterTimoutCnt.getValue())
+                        .append(",\"").append(hbMasterExcCnt.getFullName()).append("\":")
+                        .append(hbMasterExcCnt.getValue())
+                        .append(",\"").append(regBrokerCnt.getFullName()).append("\":")
+                        .append(regBrokerCnt.getValue())
+                        .append(",\"").append(regBrokerFailCnt.getFullName()).append("\":")
+                        .append(regBrokerFailCnt.getValue())
+                        .append(",\"").append(regBrokerTimoutCnt.getFullName()).append("\":")
+                        .append(regBrokerTimoutCnt.getValue())
+                        .append(",\"").append(hbBrokerExcCnt.getFullName()).append("\":")
+                        .append(hbBrokerExcCnt.getValue())
+                        .append("}");
+            }
+        }
+
+        /**
+         * Get traffic statistics by partition key
+         *
+         * @param strBuff      string buffer
+         * @param isProducer   whether producer role
+         * @param resetValue   whether to reset the current data
+         */
+        public void getPartDetailsInfo(StringBuilder strBuff,
+                                       boolean isProducer,
+                                       boolean resetValue) {
+            int totalCnt = 0;
+            strBuff.append("\"part_details\":{");
+            for (PartitionStatsItemSet partStatsSet : partDltStatsMap.values()) {
+                if (partStatsSet == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                partStatsSet.getValue(strBuff, isProducer, false);
+            }
+            strBuff.append("}");
+            if (resetValue) {
+                partDltStatsMap.clear();
+            }
+        }
+    }
+
+    /**
+     * PartitionStatsItemSet, partition related statistics set
+     *
+     */
+    private static class PartitionStatsItemSet {
+        private final String partKey;
+        protected  final TrafficStatsUnit trafficStatsUnit =
+                new TrafficStatsUnit("msg_cnt", "msg_size", "traffic");
+        // time consumption statistics for sending or receiving messages
+        protected final ESTHistogram msgCallDltStats =
+                new ESTHistogram("msg_call_dlt", "");
+        // statistics on consumption transaction time
+        protected final ESTHistogram csmLatencyStats =
+                new ESTHistogram("csm_latency_dlt", "");
+        // time consumption statistics for confirm request
+        protected final ESTHistogram confirmDltStats =
+                new ESTHistogram("msg_confirm_dlt", "");
+
+        public PartitionStatsItemSet(String partKey) {
+            this.partKey = partKey;
+        }
+
+        /**
+         * Get partition's traffic statistics
+         *
+         * @param strBuff      string buffer
+         * @param isProducer   whether producer role
+         * @param resetValue   whether to reset the current data
+         */
+        public void getValue(StringBuilder strBuff, boolean isProducer, boolean resetValue) {
+            strBuff.append("\"").append(partKey).append("\":{");
+            trafficStatsUnit.getValue(strBuff, resetValue);
+            strBuff.append(",");
+            if (resetValue) {
+                msgCallDltStats.snapShort(strBuff, false);
+                if (!isProducer) {
+                    strBuff.append(",");
+                    csmLatencyStats.snapShort(strBuff, false);
+                    strBuff.append(",");
+                    confirmDltStats.snapShort(strBuff, false);
+                }
+            } else {
+                msgCallDltStats.getValue(strBuff, false);
+                if (!isProducer) {
+                    strBuff.append(",");
+                    csmLatencyStats.getValue(strBuff, false);
+                    strBuff.append(",");
+                    confirmDltStats.getValue(strBuff, false);
+                }
+            }
+            strBuff.append("}");
+        }
     }
 }
 
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java
new file mode 100644
index 0000000..6eed718
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.Objects;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+
+/**
+ * StatsConfig, configuration settings related to client statistics
+ *
+ */
+public class StatsConfig {
+    // client statistics information print period
+    private static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 6 * 1000 * 60L;
+    private static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60L;
+    private static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60L;
+    // client statistics information print period
+    private static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000L;
+    private static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 30 * 1000L;
+    private static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000L;
+    // data statistics level
+    private StatsLevel statsLevel = StatsLevel.MEDIUM;
+    // Enable metric information print
+    private boolean enableSelfPrint = true;
+    // Metric print period in ms.
+    private long selfPrintPeriodMs = STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
+    // Metric reset value period in ms.
+    private long forcedResetPeriodMs = STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
+
+    /**
+     * Initialize instance with the default value
+     *
+     */
+    public StatsConfig() {
+        //
+    }
+
+    /**
+     * Update current configure settings according to the specified configuration
+     *
+     * @param that   the specified configuration
+     */
+    public void updateStatsConfig(StatsConfig that) {
+        if (that == null) {
+            return;
+        }
+        updateStatsConfig(that.statsLevel, that.enableSelfPrint,
+                that.selfPrintPeriodMs, that.forcedResetPeriodMs);
+    }
+
+    /**
+     * Update current configure settings
+     * Attention, if statsLevel is ZERO, then printing will be automatically turned off
+     * regardless of the value of enableSelfPrint
+     *
+     * @param statsLevel          the statistics level
+     * @param enableSelfPrint     whether to allow the SDK to print by itself
+     * @param selfPrintPeriodMs   the time interval that the SDK prints itself
+     * @param forcedResetPeriodMs the resets interval for collecting data
+     */
+    public void updateStatsConfig(StatsLevel statsLevel, boolean enableSelfPrint,
+                                  long selfPrintPeriodMs, long forcedResetPeriodMs) {
+        updateStatsControl(statsLevel, enableSelfPrint);
+        setStatsPeriodInfo(selfPrintPeriodMs, forcedResetPeriodMs);
+    }
+
+    /**
+     * Update current statistics control settings
+     * Attention, if statsLevel is ZERO, then printing will be automatically turned off
+     * regardless of the value of enableSelfPrint
+     *
+     * @param statsLevel          the statistics level
+     * @param enableSelfPrint     whether to allow the SDK to print by itself
+     */
+    public void updateStatsControl(StatsLevel statsLevel, boolean enableSelfPrint) {
+        this.statsLevel = statsLevel;
+        this.enableSelfPrint = enableSelfPrint;
+    }
+
+    /**
+     * Update current period settings
+     *
+     * @param selfPrintPeriodMs   the time interval that the SDK prints itself
+     * @param forcedResetPeriodMs the resets interval for collecting data
+     */
+    public void setStatsPeriodInfo(long selfPrintPeriodMs,
+                                   long forcedResetPeriodMs) {
+        this.selfPrintPeriodMs =
+                MixedUtils.mid(selfPrintPeriodMs,
+                        STATS_SELF_PRINT_MIN_PERIOD_MS, STATS_SELF_PRINT_MAX_PERIOD_MS);
+        this.forcedResetPeriodMs =
+                MixedUtils.mid(forcedResetPeriodMs,
+                        STATS_AUTO_RESET_MIN_PERIOD_MS, STATS_AUTO_RESET_MAX_PERIOD_MS);
+    }
+
+    public void setStatsLevel(StatsLevel statsLevel) {
+        this.statsLevel = statsLevel;
+    }
+
+    public StatsLevel getStatsLevel() {
+        return statsLevel;
+    }
+
+    public boolean isEnableSelfPrint() {
+        return enableSelfPrint;
+    }
+
+    public long getSelfPrintPeriodMs() {
+        return selfPrintPeriodMs;
+    }
+
+    public long getForcedResetPeriodMs() {
+        return forcedResetPeriodMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof StatsConfig)) {
+            return false;
+        }
+        StatsConfig that = (StatsConfig) o;
+        return ((enableSelfPrint == that.enableSelfPrint)
+                && (selfPrintPeriodMs == that.selfPrintPeriodMs)
+                && (forcedResetPeriodMs == that.forcedResetPeriodMs)
+                && (statsLevel == that.statsLevel));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(statsLevel, enableSelfPrint,
+                selfPrintPeriodMs, forcedResetPeriodMs);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder(512)
+                .append("\"StatsConfig\":{\"statsLevel\":\"").append(statsLevel.getName())
+                .append("\",\"enableSelfPrint\":").append(enableSelfPrint)
+                .append(",\"selfPrintPeriodMs\":").append(selfPrintPeriodMs)
+                .append(",\"forcedResetPeriodMs\":").append(forcedResetPeriodMs)
+                .append("}").toString();
+    }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
similarity index 75%
rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
index e45f7f4..5f3e759 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.tubemq.client.common;
 
-public enum StatsOutputLevel {
-    SIMPLEST(0, "simplest", "Simplest statistics output"),
-    MEDIUM(1, "medium", "Medium statistics output"),
-    FULL(2, "full", "Full statistics output");
+public enum StatsLevel {
+    ZERO(0, "closed", "Statistics are turned off"),
+    SIMPLEST(1, "simplest", "Simplest statistics"),
+    MEDIUM(2, "medium", "Medium statistics"),
+    FULL(3, "full", "Full statistics");
 
-    StatsOutputLevel(int id, String name, String desc) {
+    StatsLevel(int id, String name, String desc) {
         this.id = id;
         this.name = name;
         this.desc = desc;
@@ -40,8 +41,8 @@ public enum StatsOutputLevel {
         return desc;
     }
 
-    public static StatsOutputLevel valueOf(int value) {
-        for (StatsOutputLevel outputLevel : StatsOutputLevel.values()) {
+    public static StatsLevel valueOf(int value) {
+        for (StatsLevel outputLevel : StatsLevel.values()) {
             if (outputLevel.getId() == value) {
                 return outputLevel;
             }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index 0269af2..b7a8d70 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -25,7 +25,7 @@ public class TClientConstants {
     public static final long CFG_DEFAULT_REGFAIL_WAIT_PERIOD_MS = 1000;
     public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 400L;
     public static final long CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS = 90000L;
-    public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 50L;
+    public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 3L;
     public static final long CFG_DEFAULT_PUSH_LISTENER_WAIT_PERIOD_MS = 3000L;
     public static final long CFG_DEFAULT_PULL_REB_CONFIRM_WAIT_PERIOD_MS = 3000L;
     public static final long CFG_DEFAULT_PULL_PROTECT_CONFIRM_WAIT_PERIOD_MS = 60000L;
@@ -39,14 +39,4 @@ public class TClientConstants {
 
     public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
     public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
-
-    // client statistics information print period
-    public static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
-    public static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60;
-    public static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
-
-    // client statistics information print period
-    public static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
-    public static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 60 * 1000;
-    public static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index f06a4c0..ad8b920 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -17,11 +17,12 @@
 
 package org.apache.inlong.tubemq.client.config;
 
+import org.apache.inlong.tubemq.client.common.StatsConfig;
+import org.apache.inlong.tubemq.client.common.StatsLevel;
 import org.apache.inlong.tubemq.client.common.TClientConstants;
 import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
 import org.apache.inlong.tubemq.corebase.config.TLSConfig;
 import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
-import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.corerpc.RpcConstants;
 
@@ -55,12 +56,8 @@ public class TubeClientConfig {
     private long heartbeatPeriodAfterFail = TClientConstants.CFG_DEFAULT_HEARTBEAT_PERIOD_AFTER_RETRY_FAIL;
     // Link statistic check duration in ms.
     private long linkStatsDurationMs = RpcConstants.CFG_LQ_STATS_DURATION_MS;
-    // Enable metric information print
-    private boolean enableStatsSelfPrint = true;
-    // Metric print period in ms.
-    private long statsSelfPrintPeriodMs = TClientConstants.STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
-    // Metric reset value period in ms.
-    private long statsForcedResetPeriodMs = TClientConstants.STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
+    // statistics setting
+    private final StatsConfig statsConfig = new StatsConfig();
 
     // The following 5 configuration parameters are used in broker exception process.
     //
@@ -469,34 +466,14 @@ public class TubeClientConfig {
         return usrPassWord;
     }
 
-    public boolean enableStatsSelfPrint() {
-        return enableStatsSelfPrint;
+    public StatsConfig getStatsConfig() {
+        return this.statsConfig;
     }
 
-    public void setStatsSelfPrint(boolean enableStatsSelfPrint) {
-        this.enableStatsSelfPrint = enableStatsSelfPrint;
-    }
-
-    public long getStatsSelfPrintPeriodMs() {
-        return statsSelfPrintPeriodMs;
-    }
-
-    public void setStatsSelfPrintPeriodMs(long statsSelfPrintPeriodMs) {
-        this.statsSelfPrintPeriodMs =
-                MixedUtils.mid(statsSelfPrintPeriodMs,
-                        TClientConstants.STATS_SELF_PRINT_MIN_PERIOD_MS,
-                        TClientConstants.STATS_SELF_PRINT_MAX_PERIOD_MS);
-    }
-
-    public long getStatsForcedResetPeriodMs() {
-        return statsForcedResetPeriodMs;
-    }
-
-    public void setStatsForcedResetPeriodMs(long statsForcedResetPeriodMs) {
-        this.statsForcedResetPeriodMs =
-                MixedUtils.mid(statsForcedResetPeriodMs,
-                        TClientConstants.STATS_AUTO_RESET_MIN_PERIOD_MS,
-                        TClientConstants.STATS_AUTO_RESET_MAX_PERIOD_MS);
+    public void setStatsConfig(StatsLevel statsLevel, boolean enableSelfPrint,
+                               long selfPrintPeriodMs, long forcedResetPeriodMs) {
+        this.statsConfig.updateStatsConfig(statsLevel,
+                enableSelfPrint, selfPrintPeriodMs, forcedResetPeriodMs);
     }
 
     @Override
@@ -591,13 +568,7 @@ public class TubeClientConfig {
         if (!this.tlsConfig.equals(that.tlsConfig)) {
             return false;
         }
-        if (this.enableStatsSelfPrint != that.enableStatsSelfPrint) {
-            return false;
-        }
-        if (this.statsSelfPrintPeriodMs != that.statsSelfPrintPeriodMs) {
-            return false;
-        }
-        if (this.statsForcedResetPeriodMs != that.statsForcedResetPeriodMs) {
+        if (!this.statsConfig.equals(that.statsConfig)) {
             return false;
         }
         return masterInfo.equals(that.masterInfo);
@@ -625,36 +596,34 @@ public class TubeClientConfig {
             sBuilder.append("\"").append(item).append("\"");
         }
         return sBuilder.append("],\"rpcReadTimeoutMs\":").append(this.rpcReadTimeoutMs)
-            .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
-            .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
-            .append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
-            .append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
-            .append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
-            .append(",\"maxRegisterRetryTimes\":").append(this.maxRegisterRetryTimes)
-            .append(",\"regFailWaitPeriodMs\":").append(this.regFailWaitPeriodMs)
-            .append(",\"maxHeartBeatRetryTimes\":").append(this.maxHeartBeatRetryTimes)
-            .append(",\"heartbeatPeriodMs\":").append(this.heartbeatPeriodMs)
-            .append(",\"heartbeatPeriodAfterFail\":").append(this.heartbeatPeriodAfterFail)
-            .append(",\"linkStatsDurationMs\":").append(this.linkStatsDurationMs)
-            .append(",\"linkStatsForbiddenDurationMs\":").append(this.linkStatsForbiddenDurationMs)
-            .append(",\"linkStatsMaxAllowedFailTimes\":").append(this.linkStatsMaxAllowedFailTimes)
-            .append(",\"linkStatsMaxForbiddenRate\":").append(this.linkStatsMaxForbiddenRate)
-            .append(",\"maxSentForbiddenRate\":").append(this.maxSentForbiddenRate)
-            .append(",\"maxForbiddenCheckDuration\":").append(this.maxForbiddenCheckDuration)
-            .append(",\"sessionStatisticCheckDuration\":").append(this.sessionStatisticCheckDuration)
-            .append(",\"sessionWarnForbiddenRate\":").append(this.sessionWarnForbiddenRate)
-            .append(",\"sessionWarnDelayedMsgCount\":").append(this.sessionWarnDelayedMsgCount)
-            .append(",\"linkMaxAllowedDelayedMsgCount\":").append(this.linkMaxAllowedDelayedMsgCount)
-            .append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
-            .append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
-            .append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
-            .append(",\"enableStatsSelfPrint\":").append(this.enableStatsSelfPrint)
-            .append(",\"statsSelfPrintPeriodMs\":").append(this.statsSelfPrintPeriodMs)
-            .append(",\"statsForcedResetPeriodMs\":").append(this.statsForcedResetPeriodMs)
-            .append(",\"usrName\":\"").append(this.usrName)
-            .append("\",\"usrPassWord\":\"").append(this.usrPassWord)
-            .append("\",\"localAddress\":\"").append(localAddress)
-            .append("\",").append(this.tlsConfig.toString())
-            .append("}").toString();
+                .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
+                .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
+                .append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
+                .append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
+                .append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
+                .append(",\"maxRegisterRetryTimes\":").append(this.maxRegisterRetryTimes)
+                .append(",\"regFailWaitPeriodMs\":").append(this.regFailWaitPeriodMs)
+                .append(",\"maxHeartBeatRetryTimes\":").append(this.maxHeartBeatRetryTimes)
+                .append(",\"heartbeatPeriodMs\":").append(this.heartbeatPeriodMs)
+                .append(",\"heartbeatPeriodAfterFail\":").append(this.heartbeatPeriodAfterFail)
+                .append(",\"linkStatsDurationMs\":").append(this.linkStatsDurationMs)
+                .append(",\"linkStatsForbiddenDurationMs\":").append(this.linkStatsForbiddenDurationMs)
+                .append(",\"linkStatsMaxAllowedFailTimes\":").append(this.linkStatsMaxAllowedFailTimes)
+                .append(",\"linkStatsMaxForbiddenRate\":").append(this.linkStatsMaxForbiddenRate)
+                .append(",\"maxSentForbiddenRate\":").append(this.maxSentForbiddenRate)
+                .append(",\"maxForbiddenCheckDuration\":").append(this.maxForbiddenCheckDuration)
+                .append(",\"sessionStatisticCheckDuration\":").append(this.sessionStatisticCheckDuration)
+                .append(",\"sessionWarnForbiddenRate\":").append(this.sessionWarnForbiddenRate)
+                .append(",\"sessionWarnDelayedMsgCount\":").append(this.sessionWarnDelayedMsgCount)
+                .append(",\"linkMaxAllowedDelayedMsgCount\":").append(this.linkMaxAllowedDelayedMsgCount)
+                .append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
+                .append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
+                .append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
+                .append(",").append(this.statsConfig.toString())
+                .append(",\"usrName\":\"").append(this.usrName)
+                .append("\",\"usrPassWord\":\"").append(this.usrPassWord)
+                .append("\",\"localAddress\":\"").append(localAddress)
+                .append("\",").append(this.tlsConfig.toString())
+                .append("}").toString();
     }
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 5571507..7263a9d 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -145,10 +145,8 @@ public class BaseMessageConsumer implements MessageConsumer {
             throw new TubeClientException("Get consumer id failed!", e);
         }
         this.clientStatsInfo =
-                new ClientStatsInfo(false, this.consumerId,
-                        this.consumerConfig.enableStatsSelfPrint(),
-                        this.consumerConfig.getStatsSelfPrintPeriodMs(),
-                        this.consumerConfig.getStatsForcedResetPeriodMs());
+                new ClientStatsInfo(false,
+                        this.consumerId, this.consumerConfig.getStatsConfig());
         this.rmtDataCache =
                 new RmtDataCache(this.consumerConfig, null);
         this.rpcServiceFactory =
@@ -458,7 +456,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 //
             }
         }
-        clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
+        clientStatsInfo.selfPrintStatsInfo(true, true, strBuffer);
         logger.info(strBuffer
                 .append("[SHUTDOWN_CONSUMER] Partitions unregistered,  consumer :")
                 .append(this.consumerId).toString());
@@ -1280,7 +1278,8 @@ public class BaseMessageConsumer implements MessageConsumer {
                         strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                             .append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
                     strBuffer.delete(0, strBuffer.length());
-                    clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
+                    clientStatsInfo.bookSuccGetMsg(dltTime,
+                            topic, partitionKey, msgCount, msgSize);
                     break;
                 }
                 case TErrCodeConstants.HB_NO_NODE:
@@ -1422,7 +1421,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
                         consumerConfig.getPullProtectConfirmTimeoutMs());
                 // print metric information
-                clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
+                clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
                 // Fetch the rebalance result, construct message adn return it.
                 ConsumerEvent event = rebalanceResults.poll();
                 List<SubscribeInfo> subInfoList = null;
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
index c4dca03..03749ba 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
@@ -70,6 +70,10 @@ public class FetchContext {
         return partition;
     }
 
+    public String getPartitionKey() {
+        return partition.getPartitionKey();
+    }
+
     public long getUsedToken() {
         return usedToken;
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index eaf379d..d1a1a68 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -141,9 +141,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 new RmtDataCache(this.consumerConfig, null);
         this.clientStatsInfo =
                 new ClientStatsInfo(false, this.consumerId,
-                        this.consumerConfig.enableStatsSelfPrint(),
-                        this.consumerConfig.getStatsSelfPrintPeriodMs(),
-                        this.consumerConfig.getStatsForcedResetPeriodMs());
+                        this.consumerConfig.getStatsConfig());
         this.rpcServiceFactory =
                 this.sessionFactory.getRpcServiceFactory();
         this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
@@ -327,7 +325,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
             }
         }
         // print metric information
-        clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
+        clientStatsInfo.selfPrintStatsInfo(true, true, strBuffer);
         logger.info(strBuffer
                 .append("[SHUTDOWN_CONSUMER] Partitions unregistered,  consumer :")
                 .append(this.consumerId).toString());
@@ -531,13 +529,13 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
             if (selectResult.isSuccess()) {
                 break;
             }
-            if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0)
+            if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0L)
                     && ((System.currentTimeMillis() - startTime)
                     >= consumerConfig.getPullConsumeReadyWaitPeriodMs())) {
                 result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
                 return result.isSuccess();
             }
-            if (consumerConfig.getPullConsumeReadyChkSliceMs() > 10) {
+            if (consumerConfig.getPullConsumeReadyChkSliceMs() > 0L) {
                 ThreadUtils.sleep(consumerConfig.getPullConsumeReadyChkSliceMs());
             }
         }
@@ -595,6 +593,9 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
         sBuilder.delete(0, sBuilder.length());
         String topicName = strConfirmContextItems[1].trim();
         long timeStamp = Long.parseLong(strConfirmContextItems[3]);
+        long midTime = System.currentTimeMillis();
+        // book statistics information
+        clientStatsInfo.bookReturnDuration(keyId, midTime - timeStamp);
         if (!clientRmtDataCache.isPartitionInUse(keyId, timeStamp)) {
             result.setFailResult(TErrCodeConstants.BAD_REQUEST,
                     "The confirmContext's value invalid!");
@@ -649,6 +650,8 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
             } finally {
                 clientRmtDataCache.succRspRelease(keyId, topicName, timeStamp,
                         isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+                clientStatsInfo.bookConfirmDuration(keyId,
+                        System.currentTimeMillis() - midTime);
             }
         }
     }
@@ -776,7 +779,8 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                             sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                                     .append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
                     sBuffer.delete(0, sBuffer.length());
-                    clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
+                    clientStatsInfo.bookSuccGetMsg(dltTime,
+                            topic, partitionKey, msgCount, msgSize);
                     break;
                 }
                 case TErrCodeConstants.HB_NO_NODE:
@@ -981,7 +985,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 clientRmtDataCache.resumeTimeoutConsumePartitions(false,
                         consumerConfig.getPullProtectConfirmTimeoutMs());
                 // print metric information
-                clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
+                clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
                 // Send heartbeat request to master
                 ClientMaster.HeartResponseM2CV2 response =
                         masterService.consumerHeartbeatC2MV2(createMasterHeartBeatRequest(),
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
index b430781..c7142c9 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -142,12 +142,12 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
             if (selectResult.isSuccess()) {
                 break;
             }
-            if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0)
+            if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0L)
                 && (System.currentTimeMillis() - startTime
                     >= baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs())) {
                 return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
             }
-            if (baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs() > 10) {
+            if (baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs() > 0L) {
                 ThreadUtils.sleep(baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs());
             }
         }
@@ -205,8 +205,8 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
                     .append("Not found the partition by confirmContext:")
                     .append(confirmContext).toString());
         }
-        baseConsumer.clientStatsInfo.bookConfirmDuration(
-                System.currentTimeMillis() - timeStamp);
+        long midTime = System.currentTimeMillis();
+        baseConsumer.clientStatsInfo.bookReturnDuration(keyId, midTime - timeStamp);
         if (this.baseConsumer.consumerConfig.isPullConfirmInLocal()) {
             baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
                 timeStamp, isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
@@ -240,6 +240,8 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
             } finally {
                 baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
                     timeStamp, isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+                baseConsumer.clientStatsInfo.bookConfirmDuration(keyId,
+                        System.currentTimeMillis() - midTime);
             }
         }
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
index 45f5b86..f535588 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
@@ -249,7 +249,7 @@ public class SimplePushMessageConsumer implements PushMessageConsumer {
         } else {
             this.receiveMessages(request, topicProcessor);
         }
-        baseConsumer.clientStatsInfo.bookConfirmDuration(
+        baseConsumer.clientStatsInfo.bookReturnDuration(request.getPartitionKey(),
                 System.currentTimeMillis() - request.getUsedToken());
         return true;
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 312ada1..afe929f 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -143,9 +143,7 @@ public class ProducerManager {
         // initial client statistics configure
         this.clientStatsInfo =
                 new ClientStatsInfo(true, this.producerId,
-                        this.tubeClientConfig.enableStatsSelfPrint(),
-                        this.tubeClientConfig.getStatsSelfPrintPeriodMs(),
-                        this.tubeClientConfig.getStatsForcedResetPeriodMs());
+                        this.tubeClientConfig.getStatsConfig());
         heartBeatStatus.set(0);
         this.masterService =
                 this.rpcServiceFactory.getFailoverService(MasterService.class,
@@ -303,7 +301,7 @@ public class ProducerManager {
             }
             return;
         }
-        clientStatsInfo.selfPrintStatsInfo(true, strBuff);
+        clientStatsInfo.selfPrintStatsInfo(true, true, strBuff);
         if (this.nodeStatus.compareAndSet(0, 1)) {
             this.heartbeatService.shutdownNow();
             this.topicPartitionMap.clear();
@@ -676,7 +674,7 @@ public class ProducerManager {
                 ThreadUtils.sleep(100);
             }
             // print metrics information
-            clientStatsInfo.selfPrintStatsInfo(false, sBuilder);
+            clientStatsInfo.selfPrintStatsInfo(false, true, sBuilder);
             // check whether public topics
             if (publishTopics.isEmpty()) {
                 return;
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 6f71b44..291c56e 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -371,7 +371,7 @@ public class SimpleMessageProducer implements MessageProducer {
         final String resultStr = response.getErrMsg();
         if (response.getErrCode() == TErrCodeConstants.SUCCESS) {
             producerManager.getClientMetrics().bookSuccSendMsg(dltTime,
-                    message.getTopic(), message.getData().length);
+                    message.getTopic(), partition.getPartitionKey(), message.getData().length);
             if (response.hasMessageId()) {
                 return new MessageSentResult(true,
                         response.getErrCode(), "Ok!",
diff --git a/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java
new file mode 100644
index 0000000..9f6569a
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.consumer;
+
+import org.apache.inlong.tubemq.client.common.StatsConfig;
+import org.apache.inlong.tubemq.client.common.StatsLevel;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StatsConfigTest {
+    @Test
+    public void testStatsConfig() {
+        StatsConfig statsConfig = new StatsConfig();
+        Assert.assertEquals(statsConfig.getStatsLevel(), StatsLevel.MEDIUM);
+        Assert.assertTrue(statsConfig.isEnableSelfPrint());
+        Assert.assertEquals(statsConfig.getSelfPrintPeriodMs(), 6 * 1000 * 60L);
+        Assert.assertEquals(statsConfig.getForcedResetPeriodMs(), 30 * 60 * 1000L);
+        // test apis
+        statsConfig.updateStatsControl(StatsLevel.FULL, false);
+        Assert.assertEquals(statsConfig.getStatsLevel(), StatsLevel.FULL);
+        Assert.assertFalse(statsConfig.isEnableSelfPrint());
+        statsConfig.setStatsPeriodInfo(3000, 5000);
+        Assert.assertEquals(statsConfig.getSelfPrintPeriodMs(), 1000 * 60L);
+        Assert.assertEquals(statsConfig.getForcedResetPeriodMs(), 30 * 1000L);
+        // test case 2
+        statsConfig.updateStatsConfig(StatsLevel.ZERO, true, 300000L, 50000L);
+        StatsConfig statsConfig2 = new StatsConfig();
+        statsConfig2.updateStatsConfig(statsConfig);
+        Assert.assertEquals(statsConfig2.getStatsLevel(), StatsLevel.ZERO);
+        Assert.assertEquals(statsConfig2.isEnableSelfPrint(), statsConfig.isEnableSelfPrint());
+        Assert.assertEquals(statsConfig2.getSelfPrintPeriodMs(),
+                statsConfig.getSelfPrintPeriodMs());
+        Assert.assertEquals(statsConfig2.getForcedResetPeriodMs(),
+                statsConfig.getForcedResetPeriodMs());
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
index 996592a..69443d2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.tubemq.corebase.metric;
 
 import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 
 /**
  * TrafficStatsUnit, Metric Statistics item Unit
@@ -26,6 +27,8 @@ import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
  * according to the statistics dimension, which can be expanded later as needed
  */
 public class TrafficStatsUnit {
+    // the traffic name
+    private String trafficName;
     // the message count
     public LongStatsCounter msgCnt;
     // the message size
@@ -39,6 +42,7 @@ public class TrafficStatsUnit {
      * @param prefix        the prefix of statistics items
      */
     public TrafficStatsUnit(String msgCntName, String msgSizeName, String prefix) {
+        this.trafficName = prefix;
         this.msgCnt = new LongStatsCounter(msgCntName, prefix);
         this.msgSize = new LongStatsCounter(msgSizeName, prefix);
     }
@@ -53,4 +57,23 @@ public class TrafficStatsUnit {
         this.msgCnt.addValue(msgCount);
         this.msgSize.addValue(msgSize);
     }
+
+    public void getValue(StringBuilder strBuff, boolean resetValue) {
+        if (!TStringUtils.isEmpty(this.trafficName)) {
+            strBuff.append("\"").append(this.trafficName).append("\":");
+        }
+        if (resetValue) {
+            strBuff.append("{\"")
+                    .append(msgCnt.getShortName()).append("\":")
+                    .append(msgCnt.getAndResetValue()).append(",\"")
+                    .append(msgSize.getShortName()).append("\":")
+                    .append(msgSize.getAndResetValue()).append("}");
+        } else {
+            strBuff.append("{\"")
+                    .append(msgCnt.getShortName()).append("\":")
+                    .append(msgCnt.getValue()).append(",\"")
+                    .append(msgSize.getShortName()).append("\":")
+                    .append(msgSize.getValue()).append("}");
+        }
+    }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 07817aa..b2d8511 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -65,7 +65,7 @@ public class BrokerSrvStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
         } else {
-            getValue(statsMap);
+            getStatsValue(switchableSets[getIndex()], false, statsMap);
         }
     }
 
@@ -73,7 +73,7 @@ public class BrokerSrvStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
         } else {
-            getValue(strBuff);
+            getStatsValue(switchableSets[getIndex()], false, strBuff);
         }
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index c2d3ad3..373c872 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -118,14 +118,15 @@ public class MsgStoreStatsHolder {
         if (isClosed) {
             return;
         }
+        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
         if (isDataSizeFull) {
-            msgStoreStatsSets[getIndex()].cacheDataSizeFullCnt.incValue();
+            tmStatsSet.cacheDataSizeFullCnt.incValue();
         }
         if (isIndexSizeFull) {
-            msgStoreStatsSets[getIndex()].cacheIndexSizeFullCnt.incValue();
+            tmStatsSet.cacheIndexSizeFullCnt.incValue();
         }
         if (isMsgCntFull) {
-            msgStoreStatsSets[getIndex()].cacheMsgCountFullCnt.incValue();
+            tmStatsSet.cacheMsgCountFullCnt.incValue();
         }
     }
 
@@ -139,9 +140,10 @@ public class MsgStoreStatsHolder {
         if (isClosed) {
             return;
         }
-        msgStoreStatsSets[getIndex()].cacheSyncStats.update(flushTime);
+        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+        tmStatsSet.cacheSyncStats.update(flushTime);
         if (isTimeoutFlush) {
-            msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
+            tmStatsSet.cacheTimeFullCnt.incValue();
         }
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index f27147c..d7b43aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -99,11 +99,12 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
         TrafficStatsUnit tmpStatsSet;
         TrafficStatsUnit trafficStatsSet;
         // Increment write reference count
-        switchableUnits[getIndex()].refCnt.incValue();
+        WritableUnit selectedUnit = switchableUnits[getIndex()];
+        selectedUnit.refCnt.incValue();
         try {
             // Accumulate statistics information
             ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
-                    switchableUnits[getIndex()].statsUnitMap;
+                    selectedUnit.statsUnitMap;
             for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
                 trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
                 if (trafficStatsSet == null) {
@@ -118,18 +119,19 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
             }
         } finally {
             // Decrement write reference count
-            switchableUnits[getIndex()].refCnt.decValue();
+            selectedUnit.refCnt.decValue();
         }
     }
 
     @Override
     public void add(String statsKey, long msgCnt, long msgSize) {
         // Increment write reference count
-        switchableUnits[getIndex()].refCnt.incValue();
+        WritableUnit selectedUnit = switchableUnits[getIndex()];
+        selectedUnit.refCnt.incValue();
         try {
             // Accumulate statistics information
             ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
-                    switchableUnits[getIndex()].statsUnitMap;
+                    selectedUnit.statsUnitMap;
             TrafficStatsUnit trafficStatsSet = tmpStatsSetMap.get(statsKey);
             if (trafficStatsSet == null) {
                 TrafficStatsUnit tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
@@ -141,7 +143,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
             trafficStatsSet.addMsgCntAndSize(msgCnt, msgSize);
         } finally {
             // Decrement write reference count
-            switchableUnits[getIndex()].refCnt.decValue();
+            selectedUnit.refCnt.decValue();
         }
     }
 
@@ -163,7 +165,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
                 break;
             }
             try {
-                Thread.sleep(20);
+                Thread.sleep(2);
             } catch (InterruptedException e) {
                 break;
             }
@@ -172,8 +174,8 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
         Map<String, TrafficStatsUnit> statsMap = selectedUnit.statsUnitMap;
         for (Entry<String, TrafficStatsUnit> entry : statsMap.entrySet()) {
             logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
-                    entry.getValue().msgCnt.getAndResetValue(),
-                    entry.getValue().msgSize.getAndResetValue());
+                    entry.getValue().msgCnt.getValue(),
+                    entry.getValue().msgSize.getValue());
         }
         statsMap.clear();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index 9360f40..b20ddb0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -62,7 +62,7 @@ public class WebCallStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
         } else {
-            getValue(statsMap);
+            getStatsValue(switchableSets[getIndex()], false, statsMap);
         }
     }
 
@@ -70,7 +70,7 @@ public class WebCallStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
         } else {
-            getValue(strBuff);
+            getStatsValue(switchableSets[getIndex()], false, strBuff);
         }
     }
 
@@ -135,15 +135,10 @@ public class WebCallStatsHolder {
         statsMap.put("isClosed", (isManualClosed ? 1L : 0L));
         if (resetValue) {
             statsSet.totalCallStats.snapShort(statsMap, false);
-            for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
-                itemStats.snapShort(statsMap, false);
-            }
         } else {
             statsSet.totalCallStats.getValue(statsMap, false);
-            for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
-                itemStats.getValue(statsMap, false);
-            }
         }
+        statsSet.getMethodStatsInfo(statsMap, resetValue);
     }
 
     private static void getStatsValue(WebCallStatsItemSet statsSet,
@@ -152,28 +147,14 @@ public class WebCallStatsHolder {
         strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
                 .append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
                 .append("\",\"isClosed\":").append(isManualClosed).append(",");
-        int totalcnt = 0;
         if (resetValue) {
             statsSet.totalCallStats.snapShort(strBuff, false);
-            strBuff.append(",\"").append("methods\":{");
-            for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
-                if (totalcnt++ > 0) {
-                    strBuff.append(",");
-                }
-                itemStats.snapShort(strBuff, false);
-            }
-            strBuff.append("}}");
         } else {
             statsSet.totalCallStats.getValue(strBuff, false);
-            strBuff.append(",\"").append("methods\":{");
-            for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
-                if (totalcnt++ > 0) {
-                    strBuff.append(",");
-                }
-                itemStats.getValue(strBuff, false);
-            }
-            strBuff.append("}}");
         }
+        strBuff.append(",");
+        statsSet.getMethodStatsInfo(strBuff, resetValue);
+        strBuff.append("}");
     }
 
     /**
@@ -217,6 +198,45 @@ public class WebCallStatsHolder {
         public void resetSinceTime() {
             this.lstResetTime.reset();
         }
+
+        /**
+         * Gets the method statistics information
+         *
+         * @param statsMap    the statistics content contain
+         * @param resetValue  whether reset value
+         */
+        public void getMethodStatsInfo(Map<String, Long> statsMap, boolean resetValue) {
+            for (SimpleHistogram itemStats : methodStatsMap.values()) {
+                itemStats.getValue(statsMap, false);
+            }
+            if (resetValue) {
+                methodStatsMap.clear();
+            }
+        }
+
+        /**
+         * Gets the method statistics information
+         *
+         * @param strBuff     the statistics content contain
+         * @param resetValue  whether reset value
+         */
+        public void getMethodStatsInfo(StringBuilder strBuff, boolean resetValue) {
+            int totalCnt = 0;
+            strBuff.append("\"").append("methods\":{");
+            for (SimpleHistogram itemStats : methodStatsMap.values()) {
+                if (itemStats == null) {
+                    continue;
+                }
+                if (totalCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                itemStats.getValue(strBuff, false);
+            }
+            strBuff.append("}");
+            if (resetValue) {
+                methodStatsMap.clear();
+            }
+        }
     }
 }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
index a60483e..f6870fe 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
@@ -92,7 +92,7 @@ public class MasterSrvStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
         } else {
-            getValue(statsMap);
+            getStatsValue(switchableSets[getIndex()], false, statsMap);
         }
     }
 
@@ -100,7 +100,7 @@ public class MasterSrvStatsHolder {
         if (switchWritingStatsUnit()) {
             getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
         } else {
-            getValue(strBuff);
+            getStatsValue(switchableSets[getIndex()], false, strBuff);
         }
     }
     // metric set operate APIs end